From 7de73ebd2bc07a51f0f9db031d6f616bdcfe549c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 11 十月 2023 01:36:12 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' into main-dev --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 213 +++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 157 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java old mode 100644 new mode 100755 index 1aa5895..161480b --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -23,14 +24,20 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -66,7 +73,7 @@ private AudioBroadcastManager audioBroadcastManager; @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IPlayService playService; @@ -122,6 +129,9 @@ @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� @@ -191,8 +201,18 @@ String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - + if (mediaInfo == null) { + return new HookResultForOnPublish(200, "success"); + } + // 鎺ㄦ祦閴存潈鐨勫鐞� if (!"rtp".equals(param.getApp())) { + StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (stream != null) { + HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); + result.setEnable_audio(stream.isEnableAudio()); + result.setEnable_mp4(stream.isEnableMp4()); + return result; + } if (userSetting.getPushAuthority()) { // 鎺ㄦ祦閴存潈 if (param.getParams() == null) { @@ -231,10 +251,7 @@ HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); - if (!"rtp".equals(param.getApp())) { - result.setEnable_audio(true); - } - + result.setEnable_audio(true); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { @@ -246,10 +263,20 @@ } }); + // 鏄惁褰曞儚 if ("rtp".equals(param.getApp())) { result.setEnable_mp4(userSetting.getRecordSip()); } else { result.setEnable_mp4(userSetting.isRecordPushLive()); + } + // 鏇挎崲娴佸湴鍧� + if ("rtp".equals(param.getApp()) && !mediaInfo.isRtpEnable()) { + String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));; + InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); + if (inviteInfo != null) { + result.setStream_replace(inviteInfo.getStream()); + logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream()); + } } List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream()); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { @@ -263,7 +290,6 @@ // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { result.setMp4_max_second(10); - result.setEnable_audio(true); result.setEnable_mp4(true); } // 濡傛灉鏄痶alk瀵硅锛屽垯榛樿鑾峰彇澹伴煶 @@ -290,6 +316,17 @@ } } } + if (param.getApp().equalsIgnoreCase("rtp")) { + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); + + String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream(); + OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS); + if (otherRtpSendInfo != null || otherPsSendInfo != null) { + result.setEnable_mp4(true); + } + } + logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 鍝嶅簲锛歿}->{}->>>>{}", param.getMediaServerId(), param, result); return result; } @@ -453,33 +490,35 @@ GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } - GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { - eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); - } - if (type != null) { - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + } + GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); + if (gbStream != null) { + if (userSetting.isUsePushingAsStatus()) { + eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF); } } + if (type != null) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } } - if (!param.isRegist()) { - List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); - if (sendRtpItems.size() > 0) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + } + if (!param.isRegist()) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); try { if (platform != null) { @@ -488,6 +527,15 @@ sendRtpItem.getCallId(), sendRtpItem.getStream()); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� + logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } + } } } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { @@ -519,8 +567,6 @@ if ("rtp".equals(param.getApp())) { ret.put("close", userSetting.getStreamOnDemand()); // 鍥芥爣娴侊紝 鐐规挱/褰曞儚鍥炴斁/褰曞儚涓嬭浇 -// StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 鐐规挱 if (inviteInfo != null) { @@ -543,12 +589,20 @@ } redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } } } } Device device = deviceService.getDevice(inviteInfo.getDeviceId()); if (device != null) { try { + // 澶氭煡璇竴娆¢槻姝㈠凡缁忚澶勭悊浜� InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (info != null) { @@ -567,7 +621,7 @@ return ret; } SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null); - if ("talk".equals(sendRtpItem.getApp())) { + if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) { ret.put("close", false); return ret; } @@ -578,7 +632,7 @@ // 鎷夋祦浠g悊 StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { - if (streamProxyItem.isEnableDisableNoneReader()) { + if (streamProxyItem.isEnableRemoveNoneReader()) { // 鏃犱汉瑙傜湅鑷姩绉婚櫎 ret.put("close", true); streamProxyService.del(param.getApp(), param.getStream()); @@ -595,7 +649,7 @@ } return ret; } - // 鎺ㄦ祦鍏锋湁涓诲姩鎬э紝鏆傛椂涓嶅仛澶勭悊 + // TODO 鎺ㄦ祦鍏锋湁涓诲姩鎬э紝鏆傛椂涓嶅仛澶勭悊 // StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); // if (streamPushItem != null) { // // TODO 鍙戦�佸仠姝� @@ -623,7 +677,7 @@ if ("rtp".equals(param.getApp())) { String[] s = param.getStream().split("_"); - if (!mediaInfo.isRtpEnable() || s.length != 2) { + if ((s.length != 2 && s.length != 4)) { defaultResult.setResult(HookResult.SUCCESS()); return defaultResult; } @@ -639,33 +693,80 @@ defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } - logger.info("[ZLM HOOK] 娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + if (s.length == 2) { + logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; - boolean exist = resultHolder.exist(key, null); - msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - result.onTimeout(() -> { - logger.info("[ZLM HOOK] 鑷姩鐐规挱, 绛夊緟瓒呮椂"); - // 閲婃斁rtpserver - msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂")); - resultHolder.invokeResult(msg); - }); + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 棰勮娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂"); + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂")); + resultHolder.invokeAllResult(msg); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + storager.stopPlay(deviceId, channelId); + }); - // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ - resultHolder.put(key, uuid, result); + resultHolder.put(key, uuid, result); - if (!exist) { - playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> { - msg.setData(new HookResult(code, message)); + if (!exist) { + playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else if(s.length == 4){ + // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿 + String startTimeStr = s[2]; + String endTimeStr = s[3]; + if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; + } + String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); + String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); + logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}", + param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream(), + startTime, endTime + ); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 鍥炴斁娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂"); + // 閲婃斁rtpserver + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂")); resultHolder.invokeResult(msg); }); + + resultHolder.put(key, uuid, result); + + if (!exist) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null, + device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); + playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; } - return result; + } else { // 鎷夋祦浠g悊 StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); -- Gitblit v1.8.0