From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 23 四月 2024 20:59:20 +0800 Subject: [PATCH] Merge branch 'refs/heads/2.7.0' --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 511 ++++++++++++++++++++++++++------------------------------ 1 files changed, 240 insertions(+), 271 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 46e779d..f1f277f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -31,11 +31,17 @@ import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -51,6 +57,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -64,6 +71,7 @@ import java.util.Map; import java.util.Random; import java.util.Vector; +import java.util.*; /** * SIP鍛戒护绫诲瀷锛� INVITE璇锋眰 @@ -92,16 +100,16 @@ private IRedisCatchStorage redisCatchStorage; @Autowired - private IInviteStreamService inviteStreamService; + private IRedisRpcService redisRpcService; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -125,17 +133,16 @@ private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; - - @Autowired private SipConfig config; - - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired private VideoStreamSessionManager streamSession; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Override @@ -552,43 +559,79 @@ } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); + SendRtpItem sendRtpItem = new SendRtpItem(); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setPlatformName(platform.getName()); + sendRtpItem.setPlatformId(platform.getServerGBId()); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setIp(addressStr); + sendRtpItem.setPort(port); + sendRtpItem.setUsePs(true); + sendRtpItem.setApp(gbStream.getApp()); + sendRtpItem.setStream(gbStream.getStream()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setOnlyAudio(false); + sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); + // 娓呯悊鍙兘瀛樺湪鐨勭紦瀛橀伩鍏嶇敤鍒版棫鐨勬暟鎹� + List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); + if (!sendRtpItemList.isEmpty()) { + for (SendRtpItem rtpItem : sendRtpItemList) { + redisCatchStorage.deleteSendRTPServer(rtpItem); + } + } if ("push".equals(gbStream.getStreamType())) { + sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); if (pushListItem != null) { - pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId())); - // 鎺ㄦ祦鐘舵�� - pushStream(evt, request, gbStream, pushListItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + + StreamPushItem transform = streamPushService.transform(pushListItem); + transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // 寮�濮嬫帹娴� + sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { - // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //寮�鍚唬鐞嗘媺娴� - notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -614,58 +657,13 @@ /** * 瀹夋帓鎺ㄦ祦 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServer, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); - } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - - SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - - } - - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - // 鎺ㄦ祦 - if (streamPushItem.isSelf()) { - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (streamReady != null && streamReady) { - // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); @@ -674,226 +672,197 @@ } return; } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); + sendRtpItem.setPlayType(InviteStreamType.PROXY); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + + SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + } + + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鎺ㄦ祦 + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady != null && streamReady) { + // 鑷钩鍙板唴瀹� + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + } + return; } - sendRtpItem.setPlayType(InviteStreamType.PUSH); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } - } else { // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + otherWvpPushStream(sendRtpItem, request, platform); } } /** * 閫氱煡娴佷笂绾� */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - // 鐩戝惉娴佷笂绾� - Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId()); - this.hookSubscribe.addSubscribe(hook, (hookData) -> { - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", hookData.getApp(), hookData.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - this.hookSubscribe.removeSubscribe(hook); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - this.hookSubscribe.removeSubscribe(hook); - dynamicTask.stop(callIdHeader.getCallId()); + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鐩戝惉娴佷笂绾� + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); + }); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } - } else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), null, gbStream.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - try { - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // 娣诲姞鐩戝惉 - int finalPort = port; - Boolean finalTcpActive = tcpActive; - - // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { - dynamicTask.stop(callIdHeader.getCallId()); - redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); - if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }); - - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } - }); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(sendRtpItem.getCallId()); } } /** - * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + * 閫氱煡娴佷笂绾� */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), - streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> { - SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); - if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - // 鏀跺埌sendItem - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }, (wvpResult) -> { - - // 閿欒 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 绂荤嚎 - // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 鍦ㄧ嚎鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - - } else { - // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // + long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { + logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - }); + return; + } + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); + } + + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + // redis娑堟伅渚嬪锛� PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"澶辫触","app":"1","stream":"2"}' + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(sendRtpItem.getCallId()); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.removeCallback(key); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); + } + } + }); } - public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - String sdpIp = mediaServerItem.getSdpIp(); + + /** + * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + */ + private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { + logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); + if (sendRtpItem == null) { + return; + } + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { + + String sdpIp = sendRtpItem.getLocalIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); } -- Gitblit v1.8.0