From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 16 四月 2024 00:10:38 +0800 Subject: [PATCH] 优化多wvp国标级联推流 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 188 ++++++++++++++++++++-------------------------- 1 files changed, 81 insertions(+), 107 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3539922..f12f38a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -122,7 +122,7 @@ private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; + private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -568,19 +568,16 @@ } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setTcpActive(tcpActive); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); + } + + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } sendRtpItem.setTcp(mediaTransmissionTCP); sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setSsrc(ssrc); sendRtpItem.setPlatformName(platform.getName()); sendRtpItem.setPlatformId(platform.getServerGBId()); sendRtpItem.setMediaServerId(mediaServerItem.getId()); @@ -593,37 +590,48 @@ sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setOnlyAudio(false); - sendRtpItem.setPlayType(InviteStreamType.PUSH); sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { + sendRtpItem.setServerId(pushListItem.getSeverId()); StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 鎺ㄦ祦鐘舵�� - pushStream(sendRtpItem, mediaServerItem, platform, request); + sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { - // 鏈帹娴� 鎷夎捣 + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //寮�鍚唬鐞嗘媺娴� notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -649,33 +657,23 @@ /** * 瀹夋帓鎺ㄦ祦 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + } + return; } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); + sendRtpItem.setPlayType(InviteStreamType.PROXY); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); @@ -683,12 +681,10 @@ sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } - private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 鎺ㄦ祦 if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -712,6 +708,11 @@ if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { @@ -719,10 +720,6 @@ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } else { - SendRtpItem sendRtpItem = new SendRtpItem(); - sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setTcp(mediaTransmissionTCP); - sendRtpItem.setTcpActive(); // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItem, request, platform); } @@ -733,29 +730,28 @@ */ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鐩戝惉娴佷笂绾� - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + dynamicTask.stop(sendRtpItem.getCallId()); } } @@ -763,50 +759,28 @@ * 閫氱煡娴佷笂绾� */ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); - // 鍐欏叆redis寰呭彂娴佷俊鎭紝渚涘叾浠杦vp璇诲彇骞剁敓鎴愬彂娴佷俊鎭� - SendRtpItem sendRtpItemTemp = new SendRtpItem(); - sendRtpItemTemp.setIp(addressStr); - sendRtpItemTemp.setPort(port); - sendRtpItemTemp.setSsrc(ssrc); - sendRtpItemTemp.setPlatformId(requesterId); - sendRtpItemTemp.setPlatformName(platform.getName()); - sendRtpItemTemp.setTcp(mediaTransmissionTCP); - sendRtpItemTemp.setRtcp(platform.isRtcp()); - sendRtpItemTemp.setTcpActive(tcpActive); - sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); - redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); + redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); // 娣诲姞涓婄嚎鐨勯�氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); + mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -823,18 +797,18 @@ } return; } - sendRtpItemTemp.setLocalPort(localPort); - sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItemTemp.setStatus(1); - sendRtpItemTemp.setCallId(callIdHeader.getCallId()); - - sendRtpItemTemp.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); - if (response != null) { - sendRtpItemTemp.setToTag(response.getToTag()); + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); } - redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); + + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { // 鍏朵粬骞冲彴鍐呭 otherWvpPushStream(sendRtpItemFromRedis, request, platform); @@ -842,10 +816,10 @@ }); // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { -- Gitblit v1.8.0