From 30ae9e929fad80f624ab632c53081db3d2dc9aec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 25 五月 2023 17:28:57 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 208 +++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 183 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index b9d2c4c..ad015d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,23 +1,34 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,7 +38,9 @@ import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; +import javax.sip.SipFactory; import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; @@ -63,28 +76,61 @@ private SIPSender sipSender; @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private UserSetting userSetting; + + + @Autowired + private VideoStreamSessionManager streamSession; + + @Autowired private DynamicTask dynamicTask; + + @Autowired + private GitUtil gitUtil; @Override public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, null, null, errorEvent, okEvent, false, true); + register(parentPlatform, null, null, errorEvent, okEvent, true); } @Override - public void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, null, null, errorEvent, okEvent, false, false); + public void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { + + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, true); } @Override - public void register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, - SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { + public void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false); + } + + @Override + public void register(ParentPlatform parentPlatform, @Nullable SipTransactionInfo sipTransactionInfo, @Nullable WWWAuthenticateHeader www, + SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { Request request; - if (!registerAgain ) { - CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); + String fromTag = SipUtils.getNewFromTag(); + String toTag = null; + if (sipTransactionInfo != null ) { + if (sipTransactionInfo.getCallId() != null) { + callIdHeader.setCallId(sipTransactionInfo.getCallId()); + } + if (sipTransactionInfo.getFromTag() != null) { + fromTag = sipTransactionInfo.getFromTag(); + } + if (sipTransactionInfo.getToTag() != null) { + toTag = sipTransactionInfo.getToTag(); + } + } + + if (www == null ) { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, - redisCatchStorage.getCSEQ(), SipUtils.getNewFromTag(), - SipUtils.getNewViaTag(), callIdHeader, isRegister); + redisCatchStorage.getCSEQ(), fromTag, + toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0); // 灏� callid 鍐欏叆缂撳瓨锛� 绛夋敞鍐屾垚鍔熷彲浠ユ洿鏂扮姸鎬� String callIdFromHeader = callIdHeader.getCallId(); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); @@ -102,8 +148,7 @@ }); }else { - CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); - request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, SipUtils.getNewFromTag(), null, callId, www, callIdHeader, isRegister); + request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0); } sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent); @@ -225,6 +270,7 @@ catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n"); catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n"); catalogXml.append("<Password>" + channel.getPort() + "</Password>\r\n"); + catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n"); catalogXml.append("<Status>" + (channel.getStatus() == 1?"ON":"OFF") + "</Status>\r\n"); catalogXml.append("<Longitude>" + (channel.getLongitudeWgs84() != 0? channel.getLongitudeWgs84():channel.getLongitude()) @@ -265,6 +311,9 @@ String callId = request.getCallIdHeader().getCallId(); + logger.info("[鍛戒护鍙戦�乚 鍥芥爣绾ц仈{} 鐩綍鏌ヨ鍥炲: 鍏眥}鏉★紝宸插彂閫亄}鏉�", parentPlatform.getServerGBId(), + channels.size(), Math.min(index + parentPlatform.getCatalogGroup(), channels.size())); + logger.debug(catalogXml); if (sendAfterResponse) { // 榛樿鎸夌収鏀跺埌200鍥炲鍚庡彂閫佷笅涓�鏉★紝 濡傛灉瓒呮椂鏀朵笉鍒板洖澶嶏紝灏变互30姣鐨勯棿闅旂洿鎺ュ彂閫併�� dynamicTask.startDelay(timeoutTaskKey, ()->{ @@ -316,17 +365,22 @@ if (parentPlatform == null) { return; } + String deviceId = device == null ? parentPlatform.getDeviceGBId() : device.getDeviceId(); + String deviceName = device == null ? parentPlatform.getName() : device.getName(); + String manufacturer = device == null ? "WVP-28181-PRO" : device.getManufacturer(); + String model = device == null ? "platform" : device.getModel(); + String firmware = device == null ? gitUtil.getBuildVersion() : device.getFirmware(); String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceInfoXml = new StringBuffer(600); deviceInfoXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n"); deviceInfoXml.append("<Response>\r\n"); deviceInfoXml.append("<CmdType>DeviceInfo</CmdType>\r\n"); deviceInfoXml.append("<SN>" +sn + "</SN>\r\n"); - deviceInfoXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n"); - deviceInfoXml.append("<DeviceName>" + device.getName() + "</DeviceName>\r\n"); - deviceInfoXml.append("<Manufacturer>" + device.getManufacturer() + "</Manufacturer>\r\n"); - deviceInfoXml.append("<Model>" + device.getModel() + "</Model>\r\n"); - deviceInfoXml.append("<Firmware>" + device.getFirmware() + "</Firmware>\r\n"); + deviceInfoXml.append("<DeviceID>" + deviceId + "</DeviceID>\r\n"); + deviceInfoXml.append("<DeviceName>" + deviceName + "</DeviceName>\r\n"); + deviceInfoXml.append("<Manufacturer>" + manufacturer + "</Manufacturer>\r\n"); + deviceInfoXml.append("<Model>" + model + "</Model>\r\n"); + deviceInfoXml.append("<Firmware>" + firmware + "</Firmware>\r\n"); deviceInfoXml.append("<Result>OK</Result>\r\n"); deviceInfoXml.append("</Response>\r\n"); @@ -335,6 +389,7 @@ Request request = headerProviderPlatformProvider.createMessageRequest(parentPlatform, deviceInfoXml.toString(), fromTag, SipUtils.getNewViaTag(), callIdHeader); sipSender.transmitRequest(parentPlatform.getDeviceIp(), request); } + /** * 鍚戜笂绾у洖澶岲eviceStatus鏌ヨ淇℃伅 @@ -465,10 +520,10 @@ private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) throws SipException, ParseException, InvalidArgumentException { - MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipLayer.getSipFactory().createMessageFactory(); + MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory(); String characterSet = parentPlatform.getCharacterSet(); - // 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜 - messageFactory.setDefaultContentEncodingCharset(characterSet); + // 璁剧疆缂栫爜锛� 闃叉涓枃涔辩爜 + messageFactory.setDefaultContentEncodingCharset(characterSet); SIPRequest notifyRequest = headerProviderPlatformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, subscribeInfo); @@ -680,26 +735,129 @@ } @Override - public void streamByeCmd(ParentPlatform parentPlatform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException { + public synchronized void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException { if (sendRtpItem == null ) { logger.info("[鍚戜笂绾у彂閫丅YE]锛� sendRtpItem 涓篘ULL"); return; } - if (parentPlatform == null) { + if (platform == null) { logger.info("[鍚戜笂绾у彂閫丅YE]锛� platform 涓篘ULL"); return; } - logger.info("[鍚戜笂绾у彂閫丅YE]锛� {}/{}", parentPlatform.getServerGBId(), sendRtpItem.getChannelId()); + logger.info("[鍚戜笂绾у彂閫丅YE]锛� {}/{}", platform.getServerGBId(), sendRtpItem.getChannelId()); String mediaServerId = sendRtpItem.getMediaServerId(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId()); + zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream()); } - SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(parentPlatform, sendRtpItem); + SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); if (byeRequest == null) { logger.warn("[鍚戜笂绾у彂閫乥ye]锛氭棤娉曞垱寤� byeRequest"); } - sipSender.transmitRequest(parentPlatform.getDeviceIp(),byeRequest); + sipSender.transmitRequest(platform.getDeviceIp(),byeRequest); + } + + @Override + public void streamByeCmd(ParentPlatform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(platform.getServerGBId(), channelId, callId, stream); + if (ssrcTransaction == null) { + throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channelId, callId, stream); + } + + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + + Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channelId, ssrcTransaction.getSipTransactionInfo()); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent); + } + + @Override + public void broadcastResultCmd(ParentPlatform platform, DeviceChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { + if (platform == null || deviceChannel == null) { + return; + } + String characterSet = platform.getCharacterSet(); + StringBuffer mediaStatusXml = new StringBuffer(200); + mediaStatusXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n"); + mediaStatusXml.append("<Response>\r\n"); + mediaStatusXml.append("<CmdType>Broadcast</CmdType>\r\n"); + mediaStatusXml.append("<SN>" + sn + "</SN>\r\n"); + mediaStatusXml.append("<DeviceID>" + deviceChannel.getChannelId() + "</DeviceID>\r\n"); + mediaStatusXml.append("<Result>" + (result?"OK":"ERROR") + "</Result>\r\n"); + mediaStatusXml.append("</Response>\r\n"); + + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(platform.getDeviceIp(), platform.getTransport()); + + SIPRequest messageRequest = (SIPRequest)headerProviderPlatformProvider.createMessageRequest(platform, mediaStatusXml.toString(), + SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader); + + sipSender.transmitRequest(platform.getDeviceIp(),messageRequest, errorEvent, okEvent); + } + + @Override + public void broadcastInviteCmd(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, + SSRCInfo ssrcInfo, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { + String stream = ssrcInfo.getStream(); + + if (platform == null) { + return; + } + + logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + if (event != null) { + event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribe); + } + }); + String sdpIp = mediaServerItem.getSdpIp(); + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); + content.append("t=0 0\r\n"); + + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("UDP".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n"); + } + + content.append("a=recvonly\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:passive\r\n"); + content.append("a=connection:new\r\n"); + }else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:active\r\n"); + content.append("a=connection:new\r\n"); + } + + content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport()); + + Request request = headerProviderPlatformProvider.createInviteRequest(platform, channelId, + content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(), + callIdHeader); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + subscribe.removeSubscribe(hookSubscribe); + errorEvent.response(e); + }), e -> { + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST); + okEvent.response(e); + }); } } -- Gitblit v1.8.0