From 509d9b3cbba7ba8fdc7466c34e46db70d41517a8 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 27 二月 2024 10:22:13 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' into main-dev --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 117 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 files changed, 85 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java old mode 100644 new mode 100755 index be6a3c3..6de155a --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,13 +1,17 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -19,10 +23,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -50,6 +51,8 @@ import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.Vector; @@ -78,6 +81,9 @@ @Autowired private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IInviteStreamService inviteStreamService; @Autowired private SSRCFactory ssrcFactory; @@ -122,6 +128,9 @@ @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private VideoStreamSessionManager streamSession; + @Override public void afterPropertiesSet() throws Exception { @@ -143,7 +152,7 @@ String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { - logger.info("鏃犳硶浠嶧romHeader鐨凙ddress涓幏鍙栧埌骞冲彴id锛岃繑鍥�400"); + logger.info("鏃犳硶浠庤姹備腑鑾峰彇鍒板钩鍙癷d锛岃繑鍥�400"); // 鍙傛暟涓嶅叏锛� 鍙�400锛岃姹傞敊璇� try { responseAck(request, Response.BAD_REQUEST); @@ -153,6 +162,8 @@ return; } + logger.info("[INVITE] requesterId: {}, callId: {}, 鏉ヨ嚜锛歿}锛歿}", + requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort()); // 鏌ヨ璇锋眰鏄惁鏉ヨ嚜涓婄骇骞冲彴\璁惧 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); @@ -163,7 +174,7 @@ // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(channelId); + PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; @@ -400,11 +411,21 @@ // 闈炰弗鏍兼ā寮忕鍙d笉缁熶竴, 澧炲姞鍏煎鎬э紝淇敼涓轰竴涓笉涓�0鐨勭鍙� localPort = new Random().nextInt(65535) + 1; } - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + if (sendRtpItem.isTcp()) { + content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + } else { + content.append("a=setup:passive\r\n"); + } + }else { + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + } content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); + try { // 瓒呮椂鏈敹鍒癆ck搴旇鍥炲bye,褰撳墠绛夊緟鏃堕棿涓�10绉� @@ -418,8 +439,34 @@ logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); } }, 60 * 1000); - - responseSdpAck(request, content.toString(), platform); + responseSdpAck(request, content.toString(), platform); + // tcp涓诲姩妯″紡锛屽洖澶峴dp鍚庡紑鍚洃鍚� + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map<String, Object> param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 寮�鍚痳tcp淇濇椿 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥炲SdpAck", e); } @@ -438,8 +485,10 @@ sendRtpItem.setApp("rtp"); if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); - sendRtpItem.setStream(ssrcInfo.getStream()); + String startTimeStr = DateUtil.urlFormatter.format(start); + String endTimeStr = DateUtil.urlFormatter.format(end); + String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false, false, device.getStreamModeForParam()); // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), @@ -485,10 +534,13 @@ errorEvent.run(code, msg, data); } }); - }else { - + } else { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + sendRtpItem.setStream(streamId); + redisCatchStorage.updateSendRTPSever(sendRtpItem); SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()){ + if (code == InviteErrorCode.SUCCESS.getCode()) { hookEvent.run(code, msg, data); } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); @@ -498,14 +550,6 @@ errorEvent.run(code, msg, data); } })); - sendRtpItem.setPlayType(InviteStreamType.PLAY); - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - }else { - streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase(); - } - sendRtpItem.setStream(streamId); sendRtpItem.setSsrc(ssrcInfo.getSsrc()); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -550,6 +594,18 @@ logger.error("sdp瑙f瀽閿欒", e); } catch (SdpException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); + } + } + + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("涓嬬骇TCP琚姩鍚姩鐩戝惉澶辫触: 璇锋鏌LM鏈嶅姟"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("璋冪敤ZLM-TCP琚姩鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); + logger.info("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"), JSON.toJSONString(param)); } } @@ -676,8 +732,6 @@ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); dynamicTask.stop(callIdHeader.getCallId()); } - - } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { // 骞冲彴璁剧疆涓叧闂簡鎷夎捣绂荤嚎鐨勬帹娴佸垯鐩存帴鍥炲 @@ -700,13 +754,10 @@ dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); try { + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (InvalidArgumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } catch (ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } }, userSetting.getPlatformPlayTimeout()); @@ -717,6 +768,7 @@ // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); + redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); @@ -781,7 +833,7 @@ // 鍙戦�乺edis娑堟伅 redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, - channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> { + channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> { SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); @@ -1099,9 +1151,10 @@ audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); - + streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST); // 寮�鍚彂娴侊紝澶у崕鍦ㄦ敹鍒�200OK鍚庡氨浼氬紑濮嬪缓绔嬭繛鎺� if (!device.isBroadcastPushAfterAck()) { + logger.info("[璇煶鍠婅瘽] 鍥炲200OK鍚庡彂鐜� BroadcastPushAfterAck涓篎alse锛岀幇鍦ㄥ紑濮嬫帹娴�"); playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); } -- Gitblit v1.8.0