From 381c3bdc2079ece5147cf4cee004e9071edadf7a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 04 五月 2023 16:04:44 +0800 Subject: [PATCH] 修复国标点播下级平台,ssrc更新的时单端口错误更新rtpserver的问题 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 184 +++++++++++++++++++++++++++++---------------- 1 files changed, 117 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a32fadc..7034a0e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,21 +1,21 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,12 +39,14 @@ import org.springframework.stereotype.Component; import javax.sdp.*; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.Random; import java.util.Vector; /** @@ -71,6 +73,9 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + @Autowired + private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; @@ -157,11 +162,6 @@ StreamProxyItem proxyByAppAndStream =null; // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 if (channel != null && gbStream == null) { -// if (channel.getStatus() == 0) { -// logger.info("閫氶亾绂荤嚎锛岃繑鍥�400"); -// responseAck(request, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); -// return; -// } // 閫氶亾瀛樺湪锛屽彂100锛孴RYING try { responseAck(request, Response.TRYING); @@ -232,7 +232,7 @@ } return; } else { - logger.info("閫氶亾涓嶅瓨鍦紝杩斿洖404"); + logger.info("閫氶亾涓嶅瓨鍦紝杩斿洖404: {}", channelId); try { // 閫氶亾涓嶅瓨鍦紝鍙�404锛岃祫婧愪笉瀛樺湪 responseAck(request, Response.NOT_FOUND); @@ -245,18 +245,15 @@ String contentString = new String(request.getRawContent()); // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎浠ヨВ鏋愩�� - int ssrcIndex = contentString.indexOf("y="); // 妫�鏌ユ槸鍚︽湁y瀛楁 - String ssrcDefault = "0000000000"; - String ssrc; + int ssrcIndex = contentString.indexOf("y="); + SessionDescription sdp; if (ssrcIndex >= 0) { //ssrc瑙勫畾闀垮害涓�10涓瓧鑺傦紝涓嶅彇浣欎笅闀垮害浠ラ伩鍏嶅悗缁繕鏈夆�渇=鈥濆瓧娈� - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); + String substring = contentString.substring(0, ssrcIndex); sdp = SdpFactory.getInstance().createSessionDescription(substring); } else { - ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } String sessionName = sdp.getSessionName().getValue(); @@ -318,9 +315,9 @@ return; } String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); + String addressStr = sdp.getConnection().getAddress(); - logger.info("[涓婄骇鐐规挱]鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}, 鍦板潃锛歿}:{}锛� ssrc锛歿}", username, channelId, addressStr, port, ssrc); + Device device = null; // 閫氳繃 channel 鍜� gbStream 鏄惁涓簄ull 鍊煎垽鏂潵婧愭槸鐩存挱娴佸悎閫傚浗鏍� if (channel != null) { @@ -344,9 +341,27 @@ } return; } + + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + String streamTypeStr = null; + if (mediaTransmissionTCP) { + if (tcpActive) { + streamTypeStr = "TCP-ACTIVE"; + }else { + streamTypeStr = "TCP-PASSIVE"; + } + }else { + streamTypeStr = "UDP"; + } + logger.info("[涓婄骇鐐规挱] 骞冲彴锛歿}锛� 閫氶亾锛歿}, 鏀舵祦鍦板潃锛歿}:{}锛屾敹娴佹柟寮忥細{}, ssrc锛歿}", username, channelId, addressStr, port, streamTypeStr, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, - mediaTransmissionTCP); + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -385,7 +400,12 @@ } else { content.append("t=0 0\r\n"); } - content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + int localPort = sendRtpItem.getLocalPort(); + if (localPort == 0) { + // 闈炰弗鏍兼ā寮忕鍙d笉缁熶竴, 澧炲姞鍏煎鎬э紝淇敼涓轰竴涓笉涓�0鐨勭鍙� + localPort = new Random().nextInt(65535) + 1; + } + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); @@ -405,27 +425,23 @@ }, 60 * 1000); responseSdpAck(request, content.toString(), platform); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥炲SdpAck", e); } }; SipSubscribe.Event errorEvent = ((event) -> { // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 try { Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); - sipSender.transmitRequest(response); + sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); } catch (ParseException | SipException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } }); sendRtpItem.setApp("rtp"); if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); sendRtpItem.setStreamId(ssrcInfo.getStream()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -465,22 +481,24 @@ } } if (playTransaction == null) { + // 琚偣鎾殑閫氶亾鐩墠鏈鐐规挱锛屽垯寮�濮嬬偣鎾� String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); +// sendRtpItem.setSsrc(ssrcInfo.getSsrc()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }, null); + }); } else { + sendRtpItem.setStreamId(playTransaction.getStream()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -491,6 +509,15 @@ } } } else if (gbStream != null) { + + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + if("push".equals(gbStream.getStreamType())) { if (streamPushItem != null && streamPushItem.isPushIng()) { // 鎺ㄦ祦鐘舵�� @@ -502,21 +529,17 @@ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } }else if ("proxy".equals(gbStream.getStreamType())){ - if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }else{ - //寮�鍚唬鐞嗘媺娴� - boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if(start1) { + if (null != proxyByAppAndStream) { + if(proxyByAppAndStream.isStatus()){ pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); }else{ - //澶辫触鍚庨�氱煡 + //寮�鍚唬鐞嗘媺娴� notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } + } } @@ -524,7 +547,7 @@ } catch (SdpParseException e) { logger.error("sdp瑙f瀽閿欒", e); } catch (SdpException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } } @@ -539,8 +562,7 @@ if (streamReady) { // 鑷钩鍙板唴瀹� SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, - mediaTransmissionTCP); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -579,8 +601,7 @@ if (streamReady) { // 鑷钩鍙板唴瀹� SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, - mediaTransmissionTCP); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -629,15 +650,38 @@ if ("proxy".equals(gbStream.getStreamType())) { // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + // 鐩戝惉娴佷笂绾� + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> { + String app = responseJSON.getString("app"); + String stream = responseJSON.getString("stream"); + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", app, stream); + dynamicTask.stop(callIdHeader.getCallId()); + pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }); + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(callIdHeader.getCallId()); } + + + } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); @@ -658,11 +702,11 @@ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 } catch (SipException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (InvalidArgumentException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (ParseException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); // 娣诲姞鐩戝惉 @@ -674,18 +718,18 @@ dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP); + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (InvalidArgumentException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (ParseException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } return; } @@ -736,18 +780,18 @@ // 鍙戦�乺edis娑堟伅 redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, null, responseSendItemMsg -> { + channelId, mediaTransmissionTCP, platform.isRtcp(),null, responseSendItemMsg -> { SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (InvalidArgumentException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (ParseException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } return; } @@ -800,7 +844,13 @@ content.append("s=Play\r\n"); content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("t=0 0\r\n"); - content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + // 闈炰弗鏍兼ā寮忕鍙d笉缁熶竴, 澧炲姞鍏煎鎬э紝淇敼涓轰竴涓笉涓�0鐨勭鍙� + int localPort = sendRtpItem.getLocalPort(); + if(localPort == 0) + { + localPort = new Random().nextInt(65535) + 1; + } + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); if (sendRtpItem.isTcp()) { @@ -817,11 +867,11 @@ try { return responseSdpAck(request, content.toString(), platform); } catch (SipException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (InvalidArgumentException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } catch (ParseException e) { - e.printStackTrace(); + logger.error("鏈鐞嗙殑寮傚父 ", e); } return null; } @@ -894,7 +944,7 @@ return; } String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); + String addressStr = sdp.getConnection().getAddress(); logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}", username, addressStr, port, ssrc); } catch (SdpException e) { logger.error("[SDP瑙f瀽寮傚父]", e); -- Gitblit v1.8.0