From ec0ec5eb54723cc9aeabb4f313da2a101ab98bd2 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 07 九月 2022 16:18:35 +0800 Subject: [PATCH] 修复并发点播时可能出现的rtpServer开启但是还未收到流的情况,编码类型136,137,138默认开启音频通道 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 117 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 94 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e4b9e3e..82b3ba4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -13,13 +13,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -38,7 +38,6 @@ import javax.sdp.*; import javax.sip.*; -import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; @@ -65,6 +64,8 @@ @Autowired private IStreamPushService streamPushService; + @Autowired + private IStreamProxyService streamProxyService; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -86,6 +87,9 @@ @Autowired private IMediaServerService mediaServerService; + + @Autowired + private ZlmHttpHookSubscribe zlmHttpHookSubscribe; @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -142,6 +146,7 @@ MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; + StreamProxyItem proxyByAppAndStream =null; // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 if (channel != null && gbStream == null) { if (channel.getStatus() == 0) { @@ -171,6 +176,13 @@ if ("push".equals(gbStream.getStreamType())) { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null) { + logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + }else if("proxy".equals(gbStream.getStreamType())){ + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); + if (proxyByAppAndStream == null) { logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); responseAck(evt, Response.GONE); return; @@ -237,16 +249,16 @@ String protocol = media.getProtocol(); // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp - if ("TCP/RTP/AVP".equals(protocol)) { + if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { String setup = mediaDescription.getAttribute("setup"); if (setup != null) { mediaTransmissionTCP = true; - if ("active".equals(setup)) { + if ("active".equalsIgnoreCase(setup)) { tcpActive = true; // 涓嶆敮鎸乼cp涓诲姩 responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾� return; - } else if ("passive".equals(setup)) { + } else if ("passive".equalsIgnoreCase(setup)) { tcpActive = false; } } @@ -291,11 +303,11 @@ return; } sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); Long finalStartTime = startTime; Long finalStopTime = stopTime; - ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { String app = responseJSON.getString("app"); String stream = responseJSON.getString("stream"); logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream); @@ -310,7 +322,7 @@ content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); content.append("s=" + sessionName + "\r\n"); content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); - if ("Playback".equals(sessionName)) { + if ("Playback".equalsIgnoreCase(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); } else { content.append("t=0 0\r\n"); @@ -354,7 +366,7 @@ } }); sendRtpItem.setApp("rtp"); - if ("Playback".equals(sessionName)) { + if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); sendRtpItem.setStreamId(ssrcInfo.getStream()); @@ -389,7 +401,14 @@ if (playTransaction != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); if (!streamReady) { - playTransaction = null; + boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream()); + if (hasRtpServer) { + logger.info("[涓婄骇鐐规挱]宸茬粡寮�鍚痳tpServer浣嗘槸灏氭湭鏀跺埌娴侊紝寮�鍚洃鍚祦鐨勫埌鏉�"); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent); + }else { + playTransaction = null; + } } } if (playTransaction == null) { @@ -398,6 +417,7 @@ streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); + logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -416,14 +436,33 @@ } } } else if (gbStream != null) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 鎺ㄦ祦鐘舵�� - pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if("push".equals(gbStream.getStreamType())) { + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 鎺ㄦ祦鐘舵�� + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 鏈帹娴� 鎷夎捣 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }else if ("proxy".equals(gbStream.getStreamType())){ + if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //寮�鍚唬鐞嗘媺娴� + boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if(start1) { + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //澶辫触鍚庨�氱煡 + notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + } } } @@ -442,7 +481,39 @@ /** * 瀹夋帓鎺ㄦ祦 */ + private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + // 鑷钩鍙板唴瀹� + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + if (sendRtpItem == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + responseAck(evt, Response.BUSY_HERE); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + + } + + } private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, @@ -487,7 +558,6 @@ } } - /** * 閫氱煡娴佷笂绾� */ @@ -501,7 +571,8 @@ responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); return; } // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 @@ -537,7 +608,7 @@ app, stream, channelId, mediaTransmissionTCP); if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { -- Gitblit v1.8.0