From c21d973977a9f1d00d26179de764687ddd0ec56c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 24 四月 2024 14:59:41 +0800 Subject: [PATCH] 修复收到catalog消息是更新导致是否有音频的设置失效的BUG --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 677 +++++++++++++++++++++++++++++-------------------------- 1 files changed, 356 insertions(+), 321 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java old mode 100644 new mode 100755 index be6a3c3..d61a818 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,45 +1,50 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; +import gov.nist.javax.sdp.fields.URIField; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.sdp.*; @@ -50,8 +55,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; -import java.util.Random; -import java.util.Vector; +import java.util.*; /** * SIP鍛戒护绫诲瀷锛� INVITE璇锋眰 @@ -80,13 +84,16 @@ private IRedisCatchStorage redisCatchStorage; @Autowired + private IRedisRpcService redisRpcService; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + @Autowired private SSRCFactory ssrcFactory; @Autowired private DynamicTask dynamicTask; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -113,14 +120,16 @@ private UserSetting userSetting; @Autowired - private ZLMMediaListManager mediaListManager; - - @Autowired private SipConfig config; + @Autowired + private VideoStreamSessionManager streamSession; @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Override @@ -138,12 +147,25 @@ public void process(RequestEvent evt) { // Invite Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽姹傝棰戞寚浠� try { - SIPRequest request = (SIPRequest) evt.getRequest(); - String channelId = SipUtils.getChannelIdFromRequest(request); + SIPRequest request = (SIPRequest)evt.getRequest(); + String channelIdFromSub = SipUtils.getChannelIdFromRequest(request); + + // 瑙f瀽sdp娑堟伅, 浣跨敤jainsip 鑷甫鐨剆dp瑙f瀽鏂瑰紡 + String contentString = new String(request.getRawContent()); + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); + String sessionName = sdp.getSessionName().getValue(); + String channelIdFromSdp = null; + if(StringUtils.equalsIgnoreCase("Playback", sessionName)){ + URIField uriField = (URIField)sdp.getURI(); + channelIdFromSdp = uriField.getURI().split(":")[0]; + } + final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub; + String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { - logger.info("鏃犳硶浠嶧romHeader鐨凙ddress涓幏鍙栧埌骞冲彴id锛岃繑鍥�400"); + logger.info("鏃犳硶浠庤姹備腑鑾峰彇鍒板钩鍙癷d锛岃繑鍥�400"); // 鍙傛暟涓嶅叏锛� 鍙�400锛岃姹傞敊璇� try { responseAck(request, Response.BAD_REQUEST); @@ -153,6 +175,8 @@ return; } + logger.info("[INVITE] requesterId: {}, callId: {}, 鏉ヨ嚜锛歿}锛歿}", + requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort()); // 鏌ヨ璇锋眰鏄惁鏉ヨ嚜涓婄骇骞冲彴\璁惧 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); @@ -163,7 +187,7 @@ // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(channelId); + PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; @@ -246,12 +270,6 @@ } return; } - // 瑙f瀽sdp娑堟伅, 浣跨敤jainsip 鑷甫鐨剆dp瑙f瀽鏂瑰紡 - String contentString = new String(request.getRawContent()); - - Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); - SessionDescription sdp = gb28181Sdp.getBaseSdb(); - String sessionName = sdp.getSessionName().getValue(); Long startTime = null; Long stopTime = null; @@ -384,12 +402,15 @@ // * 2 鎺ㄦ祦涓� sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - + String sdpIp = mediaServerItemInUSe.getSdpIp(); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sdpIp = platform.getSendStreamIp(); + } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=" + sessionName + "\r\n"); - content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); if ("Playback".equalsIgnoreCase(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); } else { @@ -400,11 +421,21 @@ // 闈炰弗鏍兼ā寮忕鍙d笉缁熶竴, 澧炲姞鍏煎鎬э紝淇敼涓轰竴涓笉涓�0鐨勭鍙� localPort = new Random().nextInt(65535) + 1; } - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + if (sendRtpItem.isTcp()) { + content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + } else { + content.append("a=setup:passive\r\n"); + } + }else { + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + } content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); + try { // 瓒呮椂鏈敹鍒癆ck搴旇鍥炲bye,褰撳墠绛夊緟鏃堕棿涓�10绉� @@ -418,8 +449,34 @@ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); } }, 60 * 1000); - - responseSdpAck(request, content.toString(), platform); + responseSdpAck(request, content.toString(), platform); + // tcp涓诲姩妯″紡锛屽洖澶峴dp鍚庡紑鍚洃鍚� + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map<String, Object> param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 寮�鍚痳tcp淇濇椿 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥炲SdpAck", e); } @@ -438,8 +495,11 @@ sendRtpItem.setApp("rtp"); if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); - sendRtpItem.setStream(ssrcInfo.getStream()); + String startTimeStr = DateUtil.urlFormatter.format(start); + String endTimeStr = DateUtil.urlFormatter.format(end); + String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false, false, device.getStreamModeForParam()); + sendRtpItem.setStream(stream); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), @@ -485,10 +545,13 @@ errorEvent.run(code, msg, data); } }); - }else { - + } else { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + sendRtpItem.setStream(streamId); + redisCatchStorage.updateSendRTPSever(sendRtpItem); SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()){ + if (code == InviteErrorCode.SUCCESS.getCode()) { hookEvent.run(code, msg, data); } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); @@ -498,51 +561,84 @@ errorEvent.run(code, msg, data); } })); - sendRtpItem.setPlayType(InviteStreamType.PLAY); - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - }else { - streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase(); - } - sendRtpItem.setStream(streamId); sendRtpItem.setSsrc(ssrcInfo.getSsrc()); redisCatchStorage.updateSendRTPSever(sendRtpItem); } } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); + SendRtpItem sendRtpItem = new SendRtpItem(); + if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { + sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setTcp(mediaTransmissionTCP); + sendRtpItem.setRtcp(platform.isRtcp()); + sendRtpItem.setPlatformName(platform.getName()); + sendRtpItem.setPlatformId(platform.getServerGBId()); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setIp(addressStr); + sendRtpItem.setPort(port); + sendRtpItem.setUsePs(true); + sendRtpItem.setApp(gbStream.getApp()); + sendRtpItem.setStream(gbStream.getStream()); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setFromTag(request.getFromTag()); + sendRtpItem.setOnlyAudio(false); + sendRtpItem.setStatus(0); + sendRtpItem.setSessionName(sessionName); + // 娓呯悊鍙兘瀛樺湪鐨勭紦瀛橀伩鍏嶇敤鍒版棫鐨勬暟鎹� + List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); + if (!sendRtpItemList.isEmpty()) { + for (SendRtpItem rtpItem : sendRtpItemList) { + redisCatchStorage.deleteSendRTPServer(rtpItem); + } + } if ("push".equals(gbStream.getStreamType())) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 鎺ㄦ祦鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 鏈帹娴� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendRtpItem.setPlayType(InviteStreamType.PUSH); + if (streamPushItem != null) { + // 浠巖edis鏌ヨ鏄惁姝e湪鎺ユ敹杩欎釜鎺ㄦ祦 + OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + if (pushListItem != null) { + sendRtpItem.setServerId(pushListItem.getSeverId()); + sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + + StreamPushItem transform = streamPushService.transform(pushListItem); + transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // 寮�濮嬫帹娴� + sendPushStream(sendRtpItem, mediaServerItem, platform, request); + }else { + if (!platform.isStartOfflinePush()) { + // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 + try { + logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); + } + return; + } + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); + } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } if (proxyByAppAndStream.isStatus()) { - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //寮�鍚唬鐞嗘媺娴� - notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } - - } } } @@ -553,60 +649,27 @@ } } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("涓嬬骇TCP琚姩鍚姩鐩戝惉澶辫触: 璇锋鏌LM鏈嶅姟"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("璋冪敤ZLM-TCP琚姩鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); + logger.info("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"), JSON.toJSONString(param)); + } + } + /** * 瀹夋帓鎺ㄦ祦 */ - private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); - } - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - - } - - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - // 鎺ㄦ祦 - if (streamPushItem.isSelf()) { - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (streamReady != null && streamReady) { - // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); @@ -615,235 +678,205 @@ } return; } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); + sendRtpItem.setPlayType(InviteStreamType.PROXY); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); + + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + } + + private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鎺ㄦ祦 + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady != null && streamReady) { + // 鑷钩鍙板唴瀹� + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�: {}", e.getMessage()); + } + return; } - sendRtpItem.setPlayType(InviteStreamType.PUSH); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } + if (sendRtpItem.getSsrc() == null) { + // 涓婄骇骞冲彴鐐规挱鏃朵笉浣跨敤涓婄骇骞冲彴鎸囧畾鐨剆src锛屼娇鐢ㄨ嚜瀹氫箟鐨剆src锛屽弬鑰冨浗鏍囨枃妗�-鐐规挱澶栧煙璁惧濯掍綋娴丼SRC澶勭悊鏂瑰紡 + String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + sendRtpItem.setSsrc(ssrc); + } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } - } else { // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + otherWvpPushStream(sendRtpItem, request, platform); } } /** * 閫氱煡娴佷笂绾� */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - // 鐩戝惉娴佷笂绾� - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; - logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); - dynamicTask.stop(callIdHeader.getCallId()); - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }); - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); - if (!start) { - try { - responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); - dynamicTask.stop(callIdHeader.getCallId()); + private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍚敤娴佸悗寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 鐩戝惉娴佷笂绾� + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[涓婄骇鐐规挱]鎷夋祦浠g悊宸茬粡灏辩华锛� {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); + dynamicTask.stop(sendRtpItem.getCallId()); + sendProxyStream(sendRtpItem, mediaServerItem, platform, request); + }); + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟鎷夋祦浠g悊娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + }, userSetting.getPlatformPlayTimeout()); + boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (!start) { + try { + responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); } - - - } else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 - try { - logger.info("[涓婄骇鐐规挱] 澶辫触锛屾帹娴佽澶囨湭鎺ㄦ祦锛宑hannel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 invite 閫氶亾鏈帹娴�: {}", e.getMessage()); - } - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); - - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, - gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), - platform.getName(), null, gbStream.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - }, userSetting.getPlatformPlayTimeout()); - // 娣诲姞鐩戝惉 - int finalPort = port; - Boolean finalTcpActive = tcpActive; - - // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 - mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { - dynamicTask.stop(callIdHeader.getCallId()); - if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (sendRtpItem == null) { - logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - // 鍏朵粬骞冲彴鍐呭 - otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }); - - // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 - redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { - if (response.getCode() != 0) { - dynamicTask.stop(callIdHeader.getCallId()); - mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); - try { - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); - } - } - }); + zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); + dynamicTask.stop(sendRtpItem.getCallId()); } } /** - * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + * 閫氱煡娴佷笂绾� */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServerItem mediaServerItem, - int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) { - logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); - // 鍙戦�乺edis娑堟伅 - redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), - streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> { - SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); - if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - return; - } - // 鏀跺埌sendItem - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - - sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt); - if (response != null) { - sendRtpItem.setToTag(response.getToTag()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }, (wvpResult) -> { - - // 閿欒 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 绂荤嚎 - // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 鍦ㄧ嚎鐘舵�� - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - - } else { - // 涓嶅湪绾� 鎷夎捣 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - } + private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎锛屾祦涓婄嚎鍚庤 + logger.info("[ app={}, stream={} ]閫氶亾鏈帹娴侊紝鍙戦�乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", sendRtpItem.getApp(), sendRtpItem.getStream()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // + long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦寰楀埌缁撴灉鏈┖锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + logger.warn("[绾ц仈鐐规挱] 绛夊緟鎺ㄦ祦, 鏈壘鍒皉edis涓紦瀛樼殑鍙戞祦淇℃伅锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); + } + return; + } + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { + logger.info("[绾ц仈鐐规挱] 绛夊緟鐨勬帹娴佸湪鏈钩鍙颁笂绾� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + logger.warn("涓婄骇鐐规椂鍒涘缓sendRTPItem澶辫触锛屽彲鑳芥槸鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); try { responseAck(request, Response.BUSY_HERE); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲 BUSY_HERE: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("鏈鐞嗙殑寮傚父 ", e); } - }); + return; + } + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); + } + + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + // 娣诲姞鍥炲鐨勬嫆缁濇垨鑰呴敊璇殑閫氱煡 + // redis娑堟伅渚嬪锛� PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"澶辫触","app":"1","stream":"2"}' + redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(sendRtpItem.getCallId()); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.removeCallback(key); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鐐规挱鍥炲: {}", e.getMessage()); + } + } + }); } - public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { + + /** + * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + */ + private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) { + logger.info("[绾ц仈鐐规挱] 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey()); + if (sendRtpItem == null) { + return; + } + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) { + + String sdpIp = sendRtpItem.getLocalIp(); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sdpIp = platform.getSendStreamIp(); + } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=Play\r\n"); - content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); content.append("t=0 0\r\n"); // 闈炰弗鏍兼ā寮忕鍙d笉缁熶竴, 澧炲姞鍏煎鎬э紝淇敼涓轰竴涓笉涓�0鐨勭鍙� int localPort = sendRtpItem.getLocalPort(); @@ -916,7 +949,10 @@ } if (device != null) { logger.info("鏀跺埌璁惧" + requesterId + "鐨勮闊冲箍鎾璉nvite璇锋眰"); - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId(); + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); + if (!SipUtils.isFrontEnd(device.getDeviceId())) { + key += broadcastCatch.getChannelId(); + } dynamicTask.stop(key); try { responseAck(request, Response.TRYING); @@ -926,8 +962,6 @@ return; } String contentString = new String(request.getRawContent()); - // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎绉婚櫎浠ヨВ鏋愩�� - String ssrc = "0000000404"; try { Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); @@ -944,7 +978,7 @@ Media media = mediaDescription.getMedia(); Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { +// if (mediaFormats.contains("8")) { port = media.getMediaPort(); String protocol = media.getProtocol(); // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp @@ -960,7 +994,7 @@ } } break; - } +// } } if (port == -1) { logger.info("涓嶆敮鎸佺殑濯掍綋鏍煎紡锛岃繑鍥�415"); @@ -975,7 +1009,7 @@ return; } String addressStr = sdp.getOrigin().getAddress(); - logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}, {}", requesterId, addressStr, port, ssrc, + logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), mediaTransmissionTCP ? (tcpActive ? "TCP涓诲姩" : "TCP琚姩") : "UDP"); MediaServerItem mediaServerItem = broadcastCatch.getMediaServerItem(); @@ -989,11 +1023,11 @@ } return; } - logger.info("璁惧{}璇锋眰璇煶娴侊紝 鏀舵祦鍦板潃锛歿}:{}锛宻src锛歿}, {}, 瀵硅鏂瑰紡锛歿}", requesterId, addressStr, port, ssrc, + logger.info("璁惧{}璇锋眰璇煶娴侊紝 鏀舵祦鍦板潃锛歿}:{}锛宻src锛歿}, {}, 瀵硅鏂瑰紡锛歿}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), mediaTransmissionTCP ? (tcpActive ? "TCP涓诲姩" : "TCP琚姩") : "UDP", sdp.getSessionName().getValue()); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, device.getDeviceId(), broadcastCatch.getChannelId(), mediaTransmissionTCP, false); @@ -1029,7 +1063,7 @@ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream()); if (streamReady) { - sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc); + sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, gb28181Sdp.getSsrc()); } else { logger.warn("[璇煶閫氳瘽]锛� 鏈彂鐜板緟鎺ㄩ�佺殑娴�,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream()); try { @@ -1099,9 +1133,10 @@ audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); - + streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST); // 寮�鍚彂娴侊紝澶у崕鍦ㄦ敹鍒�200OK鍚庡氨浼氬紑濮嬪缓绔嬭繛鎺� if (!device.isBroadcastPushAfterAck()) { + logger.info("[璇煶鍠婅瘽] 鍥炲200OK鍚庡彂鐜� BroadcastPushAfterAck涓篎alse锛岀幇鍦ㄥ紑濮嬫帹娴�"); playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); } -- Gitblit v1.8.0