From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 16 四月 2024 00:10:38 +0800 Subject: [PATCH] 优化多wvp国标级联推流 --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | 10 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 17 + src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 16 + src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 14 - src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java | 97 ++++++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 12 - src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java | 27 ++ src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 1 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java | 77 +++++- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 188 +++++++--------- /dev/null | 97 -------- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 26 +- src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 13 + src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 5 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 4 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 1 16 files changed, 335 insertions(+), 270 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index af574b9..efe4088 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -74,6 +74,7 @@ public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; + public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index fb88f54..5385efa 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -29,9 +29,6 @@ private RedisAlarmMsgListener redisAlarmMsgListener; @Autowired - private RedisStreamMsgListener redisStreamMsgListener; - - @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired @@ -52,6 +49,9 @@ @Autowired private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; + @Autowired + private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + /** * redis娑堟伅鐩戝惉鍣ㄥ鍣� 鍙互娣诲姞澶氫釜鐩戝惉涓嶅悓璇濋鐨剅edis鐩戝惉鍣紝鍙渶瑕佹妸娑堟伅鐩戝惉鍣ㄥ拰鐩稿簲鐨勬秷鎭闃呭鐞嗗櫒缁戝畾锛岃娑堟伅鐩戝惉鍣� @@ -67,14 +67,14 @@ container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); - container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); - container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM)); container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); + container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); + container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c0507df..f1744d1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -132,6 +132,11 @@ */ private String receiveStream; + /** + * 涓婄骇鐨勭偣鎾被鍨� + */ + private String sessionName; + public String getIp() { return ip; } @@ -332,6 +337,14 @@ this.localIp = localIp; } + public String getSessionName() { + return sessionName; + } + + public void setSessionName(String sessionName) { + this.sessionName = sessionName; + } + @Override public String toString() { return "SendRtpItem{" + @@ -347,7 +360,7 @@ ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + - ", localIp=" + localIp + + ", localIp='" + localIp + '\'' + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + @@ -360,6 +373,7 @@ ", rtcp=" + rtcp + ", playType=" + playType + ", receiveStream='" + receiveStream + '\'' + + ", sessionName='" + sessionName + '\'' + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 242e5ef..af7db2e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; @@ -78,9 +77,6 @@ private DynamicTask dynamicTask; @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - - @Autowired private IPlayService playService; @@ -117,13 +113,7 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); - }); + redisCatchStorage.sendStartSendRtp(sendRtpItem); } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index ff7427b..69f8142 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -98,8 +97,6 @@ @Autowired private IStreamPushService pushService; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; @Override public void afterPropertiesSet() throws Exception { @@ -142,7 +139,7 @@ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); - redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); +// redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3539922..f12f38a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -122,7 +122,7 @@ private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; + private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -568,19 +568,16 @@ } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setTcpActive(tcpActive); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); + } + + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } sendRtpItem.setTcp(mediaTransmissionTCP); sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setSsrc(ssrc); sendRtpItem.setPlatformName(platform.getName()); sendRtpItem.setPlatformId(platform.getServerGBId()); sendRtpItem.setMediaServerId(mediaServerItem.getId()); @@ -593,37 +590,48 @@ sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setOnlyAudio(false); - sendRtpItem.setPlayType(InviteStreamType.PUSH); sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { + sendRtpItem.setServerId(pushListItem.getSeverId()); StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 鎺ㄦ祦鐘舵�� - pushStream(sendRtpItem, mediaServerItem, platform, request); + sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { - // 鏈帹娴� 鎷夎捣 + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //寮�鍚唬鐞嗘媺娴� notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -649,33 +657,23 @@ /** * 瀹夋帓鎺ㄦ祦 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + } + return; } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); + sendRtpItem.setPlayType(InviteStreamType.PROXY); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); @@ -683,12 +681,10 @@ sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } - private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 鎺ㄦ祦 if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -712,6 +708,11 @@ if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { @@ -719,10 +720,6 @@ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } else { - SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setTcp(mediaTransmissionTCP); - sendRtpItem.setTcpActive(); // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItem, request, platform); } @@ -733,29 +730,28 @@ */ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鐩戝惉娴佷笂绾� - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + dynamicTask.stop(sendRtpItem.getCallId()); } } @@ -763,50 +759,28 @@ * 閫氱煡娴佷笂绾� */ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); - // 鍐欏叆redis寰呭彂娴佷俊鎭紝渚涘叾浠杦vp璇诲彇骞剁敓鎴愬彂娴佷俊鎭� - SendRtpItem sendRtpItemTemp = new SendRtpItem(); - sendRtpItemTemp.setIp(addressStr); - sendRtpItemTemp.setPort(port); - sendRtpItemTemp.setSsrc(ssrc); - sendRtpItemTemp.setPlatformId(requesterId); - sendRtpItemTemp.setPlatformName(platform.getName()); - sendRtpItemTemp.setTcp(mediaTransmissionTCP); - sendRtpItemTemp.setRtcp(platform.isRtcp()); - sendRtpItemTemp.setTcpActive(tcpActive); - sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); + redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); // 娣诲姞涓婄嚎鐨勯�氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -823,18 +797,18 @@ } return; } - sendRtpItemTemp.setLocalPort(localPort); - sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItemTemp.setStatus(1); - sendRtpItemTemp.setCallId(callIdHeader.getCallId()); - - sendRtpItemTemp.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); - if (response != null) { - sendRtpItemTemp.setToTag(response.getToTag()); + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); } - redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); + + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItemFromRedis, request, platform); @@ -842,10 +816,10 @@ }); // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 65685aa..346b4a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -18,14 +18,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.dto.HookType; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -103,7 +101,7 @@ private EventPublisher eventPublisher; @Autowired - private ZLMMediaListManager zlmMediaListManager; + private RedisPlatformPushStreamOnlineLister zlmMediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; @@ -129,6 +127,9 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; + + @Autowired + private IStreamPushService streamPushService; /** * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆� @@ -236,10 +237,7 @@ // 閴存潈閫氳繃 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } - } else { - zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); } - HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(true); @@ -465,8 +463,7 @@ || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { param.setSeverId(userSetting.getServerId()); - zlmMediaListManager.addPush(param); - + streamPushService.updatePush(param); // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); } @@ -483,10 +480,13 @@ } } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); - if (gbStream != null) { -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 + if (gbStream == null) { + storager.removeMedia(param.getApp(), param.getStream()); + }else { +// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + storager.mediaOffline(param.getApp(), param.getStream()); } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java deleted file mode 100755 index cbc5fde..0000000 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.genersoft.iot.vmp.media.zlm; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author lin - */ -@Component -public class ZLMMediaListManager { - - private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager"); - - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; - - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IStreamProxyService streamProxyService; - - @Autowired - private StreamPushMapper streamPushMapper; - - @Autowired - private ZlmHttpHookSubscribe subscribe; - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private IMediaServerService mediaServerService; - - private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); - - public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { - StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); - StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); - transform.setPushIng(onStreamChangedHookParam.isRegist()); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); - if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - streamPushMapper.add(transform); - }else { - streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); - } - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); - } - return transform; - } - - public void sendStreamEvent(String app, String stream, String mediaServerId) { - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - // 鏌ョ湅鎺ㄦ祦鐘舵�� - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); - if (streamReady != null && streamReady) { - ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); - if (channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, mediaServerId); - } catch (ParseException e) { - logger.error("sendStreamEvent: ", e); - } - removedChannelOnlineEventLister(app, stream); - } - } - } - - public int removeMedia(String app, String streamId) { - // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 - GbStream gbStream = gbStreamMapper.selectOne(app, streamId); - int result; - if (gbStream == null) { - result = storager.removeMedia(app, streamId); - }else { - result =storager.mediaOffline(app, streamId); - } - return result; - } - - public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { - this.channelOnPublishEvents.put(app + "_" + stream, callback); - } - - public void removedChannelOnlineEventLister(String app, String stream) { - this.channelOnPublishEvents.remove(app + "_" + stream); - } - - public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { - return this.channelOnPublishEvents.get(app + "_" + stream); - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 10b1eff..32a42b1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -118,4 +118,5 @@ Map<String, StreamPushItem> getAllAppAndStreamMap(); + void updatePush(OnStreamChangedHookParam param); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 8c0e9b0..bc05276 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -31,7 +31,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; @@ -133,9 +132,6 @@ @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @@ -1366,15 +1362,7 @@ param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); } - if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); - }); - } else { + if (mediaInfo != null) { // 濡傛灉鏄弗鏍兼ā寮忥紝闇�瑕佸叧闂鍙e崰鐢� JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index e2d7e68..bb51edc 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -553,4 +553,21 @@ public Map<String, StreamPushItem> getAllAppAndStreamMap() { return streamPushMapper.getAllAppAndStreamMap(); } + + @Override + public void updatePush(OnStreamChangedHookParam param) { + StreamPushItem transform = transform(param); + StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); + transform.setPushIng(param.isRegist()); + transform.setUpdateTime(DateUtil.getNow()); + transform.setPushTime(DateUtil.getNow()); + transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); + if (pushInDb == null) { + transform.setCreateTime(DateUtil.getNow()); + streamPushMapper.add(transform); + }else { + streamPushMapper.update(transform); + gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java new file mode 100755 index 0000000..8ad0807 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java @@ -0,0 +1,97 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.text.ParseException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @author lin + */ +@Component +public class RedisPlatformPushStreamOnlineLister implements MessageListener { + + private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister"); + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + /** + * 閫氳繃redis娑堟伅鎺ユ敹娴佷笂绾跨殑閫氱煡锛屽鏋滄湰鏈虹敱瀵硅繖涓祦鐨勭洃鍚紝鍒欏洖璋� + */ + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); + sendStreamEvent(sendRtpItem); + } + }); + } + } + + private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); + + public void sendStreamEvent(SendRtpItem sendRtpItem) { + // 鏌ョ湅鎺ㄦ祦鐘舵�� + ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (channelOnlineEventLister != null) { + try { + channelOnlineEventLister.run(sendRtpItem); + } catch (ParseException e) { + logger.error("sendStreamEvent: ", e); + } + removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); + } + } + + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); + } + + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); + } + + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java index 14a96e8..25dd334 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java @@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,8 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -32,10 +38,10 @@ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired - private UserSetting userSetting; + private ZLMServerFactory zlmServerFactory; @Autowired - private ZlmHttpHookSubscribe hookSubscribe; + private IMediaServerService mediaServerService; @Qualifier("taskExecutor") @Autowired @@ -52,23 +58,14 @@ while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); - if (messageForPushChannel == null - || ObjectUtils.isEmpty(messageForPushChannel.getApp()) - || ObjectUtils.isEmpty(messageForPushChannel.getStream()) - || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ - continue; + SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); + sendRtpItem.getMediaServerId(); + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer == null) { + return; } - - // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰� - HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( - messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", - null); - hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� - - }); - + Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem); + sendRtp(sendRtpItem, mediaServer, sendRtpParam); }catch (Exception e) { logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); @@ -78,4 +75,48 @@ }); } } + + private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) { + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; + Map<String, Object> param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp妯″紡涓嬪紑鍚痳tcp淇濇椿 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + return param; + } + + private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){ + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); + } + } + return startSendRtpStreamResult; + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java index f3b415d..25600a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java @@ -2,12 +2,15 @@ import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,13 +38,25 @@ private UserSetting userSetting; @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired private ZlmHttpHookSubscribe hookSubscribe; + + @Autowired + private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; + + @Autowired + private SSRCFactory ssrcFactory; @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + /** + * 褰撲笂绾х偣鎾椂锛岃繖閲岃礋璐g洃鍚瓑鍒版祦涓婄嚎锛屾祦涓婄嚎鍚庡鏋滄槸鍦ㄥ綋鍓嶆湇鍔″垯鐩存帴鍥炶皟锛屽鏋滄槸鍏朵粬wvp锛屽垯鐢眗edis娑堟伅杩涜閫氱煡 + */ @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS娑堟伅-鏀跺埌涓婄骇绛夊埌璁惧鎺ㄦ祦鐨剅edis娑堟伅]锛� {}", new String(message.getBody())); @@ -66,7 +81,17 @@ null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘� - + SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream()); + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); + sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); + redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem); + // 閫氱煡鍏朵粬wvp锛� 鐢盧edisPlatformPushStreamOnlineLister鎺ユ敹姝ょ洃鍚�� + redisCatchStorage.sendPushStreamOnline(sendRtpItem); + } }); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java deleted file mode 100755 index 7d5ba60..0000000 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.genersoft.iot.vmp.service.redisMsg; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.text.ParseException; -import java.util.concurrent.ConcurrentLinkedQueue; - - -/** - * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡 - * @author lin - */ -@Component -public class RedisStreamMsgListener implements MessageListener { - - private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private ZLMMediaListManager zlmMediaListManager; - - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - @Override - public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 - continue; - } - logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount("0"); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream); - if ( channelOnlineEventLister != null) { - try { - channelOnlineEventLister.run(app, stream, serverId);; - } catch (ParseException e) { - logger.error("addPush: ", e); - } - zlmMediaListManager.removedChannelOnlineEventLister(app, stream); - } - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-娴佸彉鍖朷 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-娴佸彉鍖朷 寮傚父鍐呭锛� ", e); - } - } - }); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 1e5e93d..108bc17 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -219,5 +219,9 @@ void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); + SendRtpItem getWaiteSendRtpItem(String app, String stream); + void sendStartSendRtp(SendRtpItem sendRtpItem); + + void sendPushStreamOnline(SendRtpItem sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 60084df..7088837 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -686,8 +686,21 @@ } @Override + public SendRtpItem getWaiteSendRtpItem(String app, String stream) { + String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; + return (SendRtpItem)redisTemplate.opsForValue().get(key); + } + + @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); } + + @Override + public void sendPushStreamOnline(SendRtpItem sendRtpItem) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis鍙戦�侀�氱煡] 娴佷笂绾� {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); + } } -- Gitblit v1.8.0