From ffb21248cb270475ce9156b5056e591e14cbc20d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 30 五月 2023 11:20:22 +0800 Subject: [PATCH] 去除多余配置 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 251 ++++++++++++++++++++++++++------------------------ 1 files changed, 131 insertions(+), 120 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index a795e77..3a94f79 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -2,11 +2,15 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -20,10 +24,8 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -69,6 +71,9 @@ private IRedisCatchStorage redisCatchStorage; @Autowired + private IInviteStreamService inviteStreamService; + + @Autowired private IDeviceService deviceService; @Autowired @@ -104,6 +109,9 @@ @Autowired private AssistRESTfulUtils assistRESTfulUtils; + @Autowired + private SSRCFactory ssrcFactory; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @@ -112,10 +120,11 @@ * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� */ @ResponseBody + @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { - logger.info("[ZLM HOOK] 鏀跺埌zlm蹇冭烦锛�" + param.getMediaServerId()); +// logger.info("[ZLM HOOK] 鏀跺埌zlm蹇冭烦锛�" + param.getMediaServerId()); taskExecutor.execute(() -> { List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); @@ -135,6 +144,7 @@ * 鎾斁鍣ㄩ壌鏉冧簨浠讹紝rtsp/rtmp/http-flv/ws-flv/hls鐨勬挱鏀鹃兘灏嗚Е鍙戞閴存潈浜嬩欢銆� */ @ResponseBody + @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { @@ -181,13 +191,13 @@ if (userSetting.getPushAuthority()) { // 鎺ㄦ祦閴存潈 if (param.getParams() == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } Map<String, String> paramMap = urlParamToMap(param.getParams()); String sign = paramMap.get("sign"); if (sign == null) { - logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯涓嶈鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); + logger.info("鎺ㄦ祦閴存潈澶辫触锛� 缂哄皯蹇呰鍙傛暟锛歴ign=md5(user琛ㄧ殑pushKey)"); return new HookResultForOnPublish(401, "Unauthorized"); } // 鎺ㄦ祦鑷畾涔夋挱鏀鹃壌鏉冪爜 @@ -245,14 +255,33 @@ result.setEnable_audio(deviceChannel.isHasAudio()); } // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺 - if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.download) { + if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { result.setMp4_max_second(10); result.setEnable_audio(true); result.setEnable_mp4(true); } } + if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) { + logger.info("鎺ㄦ祦鏃跺彂鐜板皻鏈缃綍鍍忚矾寰勶紝浠巃ssist鏈嶅姟涓鍙�"); + JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null); + if (info != null && info.getInteger("code") != null && info.getInteger("code") == 0 ) { + JSONObject dataJson = info.getJSONObject("data"); + if (dataJson != null) { + String recordPath = dataJson.getString("record"); + userSetting.setRecordPath(recordPath); + result.setMp4_save_path(recordPath); + // 淇敼zlm涓殑褰曞儚璺緞 + if (mediaInfo.isAutoConfig()) { + taskExecutor.execute(() -> { + mediaServerService.setZLMConfig(mediaInfo, false); + }); + } + } + } + } return result; } + /** * rtsp/rtmp娴佹敞鍐屾垨娉ㄩ攢鏃惰Е鍙戞浜嬩欢锛涙浜嬩欢瀵瑰洖澶嶄笉鏁忔劅銆� @@ -271,19 +300,24 @@ JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); - if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo == null) { + logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId()); + return; } - // 娴佹秷澶辩Щ闄edis play + if (subscribe != null) { + subscribe.response(mediaInfo, json); + } + List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); + // TODO 閲嶆瀯姝ゅ閫昏緫 + boolean isPush = false; if (param.isRegist()) { + // 澶勭悊娴佹敞鍐岀殑閴存潈淇℃伅 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - + isPush = true; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); @@ -298,73 +332,71 @@ } if ("rtsp".equals(param.getSchema())) { + // 鏇存柊娴佸獟浣撹礋杞戒俊鎭� if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); } else { mediaServerService.removeCount(param.getMediaServerId()); } - if (param.getOriginType() == OriginType.PULL.ordinal() - || param.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { - // 璁剧疆鎷夋祦浠g悊涓婄嚎/绂荤嚎 - streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + // 璁剧疆鎷夋祦浠g悊涓婄嚎/绂荤嚎 + int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + if (updateStatusResult > 0) { + } + if ("rtp".equals(param.getApp()) && !param.isRegist()) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - } else { - streamInfo = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), - streamInfo.getStream(), null); - } + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); + if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { + inviteStreamService.removeInviteInfo(inviteInfo); + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); } } else { if (!"rtp".equals(param.getApp())) { String type = OriginType.values()[param.getOriginType()].getType(); - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - - if (mediaServerItem != null) { - if (param.isRegist()) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); - String callId = null; - if (streamAuthorityInfo != null) { - callId = streamAuthorityInfo.getCallId(); - } - StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, - param.getApp(), param.getStream(), tracks, callId); - param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); - redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); - if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - } - } else { - // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 - OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(param.getApp(), param.getStream(), param.getMediaServerId()); - if (onStreamChangedHookParam != null) { - type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, param.getApp(), param.getStream()); - } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { + if (param.isRegist()) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( + param.getApp(), param.getStream()); + String callId = null; + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo, + param.getApp(), param.getStream(), tracks, callId); + param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); + redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param); + if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + param.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(param); + } + } else { + // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 + OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + param.getApp(), param.getStream(), param.getMediaServerId()); + if (onStreamChangedHookParam != null) { + type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } - if (type != null) { - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - } + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); + } + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } } @@ -372,7 +404,7 @@ List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem.getApp().equals(param.getApp())) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); Device device = deviceService.getDevice(platformId); @@ -380,6 +412,8 @@ try { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStreamId()); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); } @@ -404,19 +438,28 @@ @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param) { - logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); ret.put("code", 0); // 鍥芥爣绫诲瀷鐨勬祦 if ("rtp".equals(param.getApp())) { ret.put("close", userSetting.getStreamOnDemand()); // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇 - StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); +// StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); + + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 鐐规挱 - if (streamInfoForPlayCatch != null) { + if (inviteInfo != null) { + // 褰曞儚涓嬭浇 + if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) { + ret.put("close", false); + return ret; + } // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ�� - if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId()); + if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + inviteInfo.getChannelId()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); @@ -430,47 +473,22 @@ } } } - Device device = deviceService.getDevice(streamInfoForPlayCatch.getDeviceID()); + Device device = deviceService.getDevice(inviteInfo.getDeviceId()); if (device != null) { try { - cmder.streamByeCmd(device, streamInfoForPlayCatch.getChannelId(), - streamInfoForPlayCatch.getStream(), null); + if (inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()) != null) { + cmder.streamByeCmd(device, inviteInfo.getChannelId(), + inviteInfo.getStream(), null); + } } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); } } - redisCatchStorage.stopPlay(streamInfoForPlayCatch); - storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); - return ret; - } - // 褰曞儚鍥炴斁 - StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); - if (streamInfoForPlayBackCatch != null) { - if (streamInfoForPlayBackCatch.isPause()) { - ret.put("close", false); - } else { - Device device = deviceService.getDevice(streamInfoForPlayBackCatch.getDeviceID()); - if (device != null) { - try { - cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(), - streamInfoForPlayBackCatch.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[鏃犱汉瑙傜湅]鍥炴斁锛� 鍙戦�丅YE澶辫触 {}", e.getMessage()); - } - } - redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(), - streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); - } - return ret; - } - // 褰曞儚涓嬭浇 - StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, param.getStream(), null); - // 杩涜褰曞儚涓嬭浇鏃舵棤浜鸿鐪嬩笉鏂祦 - if (streamInfoForDownload != null) { - ret.put("close", false); + inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), inviteInfo.getStream()); + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); return ret; } } else { @@ -540,6 +558,7 @@ return defaultResult; } logger.info("[ZLM HOOK] 娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; boolean exist = resultHolder.exist(key, null); @@ -547,31 +566,22 @@ String uuid = UUID.randomUUID().toString(); msg.setId(uuid); DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - DeferredResultEx<HookResult> deferredResultEx = new DeferredResultEx<>(result); result.onTimeout(() -> { - logger.info("鐐规挱鎺ュ彛绛夊緟瓒呮椂"); + logger.info("[ZLM HOOK] 鑷姩鐐规挱, 绛夊緟瓒呮椂"); // 閲婃斁rtpserver msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂")); resultHolder.invokeResult(msg); }); - // TODO 鍦ㄧ偣鎾湭鎴愬姛鐨勬儏鍐典笅鍦ㄦ璋冪敤鎺ュ彛鐐规挱浼氬鑷磋繑鍥炵殑娴佸湴鍧�ip閿欒 - deferredResultEx.setFilter(result1 -> { - WVPResult<StreamInfo> wvpResult1 = (WVPResult<StreamInfo>) result1; - HookResult resultForEnd = new HookResult(); - resultForEnd.setCode(wvpResult1.getCode()); - resultForEnd.setMsg(wvpResult1.getMsg()); - return resultForEnd; - }); // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ - resultHolder.put(key, uuid, deferredResultEx); + resultHolder.put(key, uuid, result); if (!exist) { - playService.play(mediaInfo, deviceId, channelId, null, eventResult -> { - msg.setData(new HookResult(eventResult.statusCode, eventResult.msg)); + playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> { + msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); - }, null); + }); } return result; } else { @@ -628,6 +638,7 @@ if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); } catch (SipException | InvalidArgumentException | ParseException e) { -- Gitblit v1.8.0