From ce950dea4aef933a12e78e4fe1f535ba9a83c3df Mon Sep 17 00:00:00 2001 From: leesam <leesam@leesam.cn> Date: 星期三, 10 四月 2024 22:18:40 +0800 Subject: [PATCH] Merge branch 'refs/heads/master' into develop-add-api-key --- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 25 +-- src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 19 ++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 2 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java | 12 + src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 11 - src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java | 57 ++++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 29 --- src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java | 18 ++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 54 ++---- src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java | 33 ++- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 32 +-- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 8 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java | 19 +- src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java | 7 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 20 -- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java | 15 - src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java | 4 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 8 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java | 60 ++----- 19 files changed, 207 insertions(+), 226 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 30193d2..1740260 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; + public class SendRtpItem { /** @@ -122,6 +124,23 @@ */ private String receiveStream; + public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) { + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId()); + sendRtpItem.setApp(requestPushStreamMsg.getApp()); + sendRtpItem.setStream(requestPushStreamMsg.getStream()); + sendRtpItem.setIp(requestPushStreamMsg.getIp()); + sendRtpItem.setPort(requestPushStreamMsg.getPort()); + sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc()); + sendRtpItem.setTcp(requestPushStreamMsg.isTcp()); + sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort()); + sendRtpItem.setPt(requestPushStreamMsg.getPt()); + sendRtpItem.setUsePs(requestPushStreamMsg.isPs()); + sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio()); + return sendRtpItem; + + } + public String getIp() { return ip; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index dab039f..c4c23d0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -75,8 +75,6 @@ @Autowired private IMediaServerService mediaServerService; - @Autowired - private ZLMServerFactory zlmServerFactory; /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 51f5c67..1c8353d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -13,11 +13,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -64,9 +63,6 @@ @Autowired private SipSubscribe sipSubscribe; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private SipLayer sipLayer; @@ -846,7 +842,7 @@ MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); if (byeRequest == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 5410d67..10922f4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,22 +1,17 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -28,17 +23,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; -import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.text.ParseException; -import java.util.HashMap; -import java.util.Map; /** * SIP鍛戒护绫诲瀷锛� ACK璇锋眰 @@ -70,12 +60,6 @@ @Autowired private IDeviceService deviceService; - - @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private HookSubscribe hookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -122,11 +106,8 @@ if (parentPlatform != null) { if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> { playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader); }); } else { @@ -134,7 +115,7 @@ if (sendRtpItem.isTcpActive()) { mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null); } else { - mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem); } }catch (ControllerException e) { logger.error("RTP鎺ㄦ祦澶辫触: {}", e.getMessage()); @@ -159,7 +140,7 @@ if (sendRtpItem.isTcpActive()) { mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null); } else { - mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem); } }catch (ControllerException e) { logger.error("RTP鎺ㄦ祦澶辫触: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index b2d14a0..302b694 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -6,16 +6,15 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -36,8 +35,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.HashMap; -import java.util.Map; /** * SIP鍛戒护绫诲瀷锛� BYE璇锋眰 @@ -76,12 +73,6 @@ private IVideoManagerStorage storager; @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired - private SSRCFactory ssrcFactory; - - @Autowired private IMediaServerService mediaServerService; @Autowired @@ -110,7 +101,6 @@ /** * 澶勭悊BYE璇锋眰 - * @param evt */ @Override public void process(RequestEvent evt) { @@ -128,11 +118,6 @@ logger.info("[鏀跺埌bye] 鏉ヨ嚜{}锛屽仠姝㈤�氶亾锛歿}, 绫诲瀷锛� {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId()); String streamId = sendRtpItem.getStream(); - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",streamId); - param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[鏀跺埌bye] 鍋滄鎺ㄦ祦锛歿}, 濯掍綋鑺傜偣锛� {}", streamId, sendRtpItem.getMediaServerId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { @@ -149,7 +134,7 @@ MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); + mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } @@ -169,13 +154,13 @@ MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); + mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaInfo != null) { + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer != null) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁� @@ -183,8 +168,9 @@ audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); } - int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId); + + if (mediaInfo.getReaderCount() <= 0) { logger.info("[鏀跺埌bye] {} 鏃犲叾瀹冭鐪嬭�咃紝閫氱煡璁惧鍋滄鎺ㄦ祦", streamId); if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index b4d183e..46e779d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; @@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IInviteStreamService; @@ -61,7 +61,6 @@ import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.Vector; @@ -112,9 +111,6 @@ @Autowired private AudioBroadcastManager audioBroadcastManager; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -382,8 +378,9 @@ } else { streamTypeStr = "UDP"; } - logger.info("[涓婄骇Invite] {}, 骞冲彴锛歿}锛� 閫氶亾锛歿}, 鏀舵祦鍦板潃锛歿}:{}锛屾敹娴佹柟寮忥細{}, ssrc锛歿}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + logger.info("[涓婄骇Invite] {}, 骞冲彴锛歿}锛� 閫氶亾锛歿}, 鏀舵祦鍦板潃锛歿}:{}锛屾敹娴佹柟寮忥細{}, ssrc锛歿}", + sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (tcpActive != null) { @@ -462,30 +459,10 @@ responseSdpAck(request, content.toString(), platform); // tcp涓诲姩妯″紡锛屽洖澶峴dp鍚庡紑鍚洃鍚� if (sendRtpItem.isTcpActive()) { - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - if (!sendRtpItem.isTcpActive()) { - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - param.put("is_udp", is_Udp); - param.put("src_port", localPort); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // 寮�鍚痳tcp淇濇椿 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); - } + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + try { + mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5); + }catch (ControllerException e) {} } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥炲SdpAck", e); @@ -638,13 +615,14 @@ * 瀹夋帓鎺ㄦ祦 */ private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, - CallIdHeader callIdHeader, MediaServer mediaServerItem, + CallIdHeader callIdHeader, MediaServer mediaServer, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream()); if (streamReady != null && streamReady) { + // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { @@ -665,7 +643,7 @@ sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); - SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt); + SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } @@ -684,7 +662,7 @@ Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady != null && streamReady) { // 鑷钩鍙板唴瀹� - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { @@ -794,7 +772,7 @@ dynamicTask.stop(callIdHeader.getCallId()); redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); if (serverId.equals(userSetting.getServerId())) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { @@ -1074,7 +1052,7 @@ mediaTransmissionTCP ? (tcpActive ? "TCP涓诲姩" : "TCP琚姩") : "UDP", sdp.getSessionName().getValue()); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, device.getDeviceId(), broadcastCatch.getChannelId(), mediaTransmissionTCP, false); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 08be1f3..46f9642 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -1,16 +1,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; -import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -61,9 +60,6 @@ @Autowired private AudioBroadcastManager audioBroadcastManager; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -155,12 +151,13 @@ } }else { // 鍙戞祦 - JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem); - if (jsonObject != null && jsonObject.getInteger("code") == 0 ) { - logger.info("[璇煶鍠婅瘽] 鑷姩鎺ㄦ祦鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId); - }else { - logger.info("[璇煶鍠婅瘽] 鎺ㄦ祦澶辫触, 缁撴灉锛� {}", jsonObject); + try { + mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem); + }catch (ControllerException e) { + logger.info("[璇煶鍠婅瘽] 鎺ㄦ祦澶辫触, 缁撴灉锛� {}", e.getMessage()); + return; } + logger.info("[璇煶鍠婅瘽] 鑷姩鎺ㄦ祦鎴愬姛, device: {}, channel: {}", device.getDeviceId(), targetId); } } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index fb56ab6..671790a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -141,5 +141,10 @@ void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout); - void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem); + void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem); + + SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp); + + SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 6214ccb..d513286 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; @@ -82,6 +83,9 @@ @Autowired private MediaConfig mediaConfig; + + @Autowired + private SendRtpPortManager sendRtpPortManager; @@ -812,7 +816,7 @@ } @Override - public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) { + public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[startSendRtpStream] 澶辫触, mediaServer鐨勭被鍨嬶細 {}锛屾湭鎵惧埌瀵瑰簲鐨勫疄鐜扮被", mediaServer.getType()); @@ -821,7 +825,10 @@ logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); - sendPlatformStartPlayMsg(platform, sendRtpItem); + if (platform != null) { + sendPlatformStartPlayMsg(platform, sendRtpItem); + } + } @@ -834,4 +841,50 @@ redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); } } + + @Override + public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) { + int localPort = sendRtpPortManager.getNextPort(mediaServer); + if (localPort == 0) { + return null; + } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setPlatformId(deviceId); + sendRtpItem.setDeviceId(deviceId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(isTcp); + sendRtpItem.setRtcp(rtcp); + sendRtpItem.setApp("rtp"); + sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setMediaServerId(mediaServer.getId()); + return sendRtpItem; + } + + @Override + public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp){ + + int localPort = sendRtpPortManager.getNextPort(serverItem); + if (localPort == 0) { + return null; + } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setApp(app); + sendRtpItem.setStream(stream); + sendRtpItem.setPlatformId(platformId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(tcp); + sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); + sendRtpItem.setMediaServerId(serverItem.getId()); + sendRtpItem.setRtcp(rtcp); + return sendRtpItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 84df2e7..80699d2 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -3,15 +3,13 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; @@ -20,7 +18,7 @@ import org.springframework.stereotype.Component; import java.text.ParseException; -import java.util.*; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** @@ -38,25 +36,15 @@ private GbStreamMapper gbStreamMapper; @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; - - @Autowired private IStreamPushService streamPushService; - @Autowired - private IStreamProxyService streamProxyService; @Autowired private StreamPushMapper streamPushMapper; @Autowired - private HookSubscribe subscribe; - - @Autowired private UserSetting userSetting; - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 707ea01..8a9cafc 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -316,11 +316,23 @@ if (timeout != null) { param.put("close_delay_ms", timeout); } + if (!sendRtpItem.isTcp()) { + // 寮�鍚痳tcp淇濇椿 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null); if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { + logger.error("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSON.toJSONString(param)); throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); } + logger.info("璋冪敤ZLM-TCP琚姩鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject); + logger.info("鍚姩鐩戝惉TCP琚姩鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(), + jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 14025ce..1d150d5 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,9 +24,6 @@ @Autowired private UserSetting userSetting; - - @Autowired - private HookSubscribe hookSubscribe; @Autowired private SendRtpPortManager sendRtpPortManager; diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java index 5827d01..9b446f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.service.bean; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + /** * redis娑堟伅锛氳姹備笅绾ф帹閫佹祦淇℃伅 * @author lin @@ -80,6 +82,22 @@ return requestPushStreamMsg; } + public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) { + RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg(); + requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId()); + requestPushStreamMsg.setApp(sendRtpItem.getApp()); + requestPushStreamMsg.setStream(sendRtpItem.getStream()); + requestPushStreamMsg.setIp(sendRtpItem.getIp()); + requestPushStreamMsg.setPort(sendRtpItem.getPort()); + requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc()); + requestPushStreamMsg.setTcp(sendRtpItem.isTcp()); + requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort()); + requestPushStreamMsg.setPt(sendRtpItem.getPt()); + requestPushStreamMsg.setPs(sendRtpItem.isUsePs()); + requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio()); + return requestPushStreamMsg; + } + public String getMediaServerId() { return mediaServerId; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index aa39f41..6554817 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,7 +1,9 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.*; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionStatus; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -11,13 +13,12 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.IPlayService; @@ -41,7 +42,9 @@ import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; -import java.util.*; +import java.util.List; +import java.util.UUID; +import java.util.Vector; /** * @author lin @@ -76,9 +79,6 @@ private DynamicTask dynamicTask; @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired private SubscribeHolder subscribeHolder; @Autowired @@ -86,9 +86,6 @@ @Autowired private UserSetting userSetting; - - @Autowired - private HookSubscribe subscribe; @Autowired private VideoStreamSessionManager streamSession; @@ -437,11 +434,7 @@ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map<String, Object> param = new HashMap<>(3); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStream()); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); + mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 7406483..fc6a025 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.*; @@ -1421,11 +1419,8 @@ MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> { startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader); }); } else { @@ -1433,7 +1428,7 @@ if (sendRtpItem.isTcpActive()) { mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null); } else { - mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem); } }catch (ControllerException e) { logger.error("RTP鎺ㄦ祦澶辫触: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 6692aa8..f611480 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -9,15 +9,14 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IGbStreamService; @@ -63,9 +62,6 @@ @Autowired private IVideoManagerStorage videoManagerStorager; - - @Autowired - private ZLMServerFactory zlmServerFactory; @Autowired private StreamProxyMapper streamProxyMapper; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 14287e0..68595a8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -5,12 +5,12 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -27,7 +27,6 @@ import org.springframework.stereotype.Component; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -73,9 +72,6 @@ private RedisTemplate<Object, Object> redisTemplate; @Autowired - private ZLMServerFactory zlmServerFactory; - - @Autowired private IMediaServerService mediaServerService; @Autowired @@ -101,7 +97,7 @@ } public interface PlayMsgCallbackForStartSendRtpStream{ - void handler(JSONObject jsonObject); + void handler(); } public interface PlayMsgErrorCallback{ @@ -181,11 +177,10 @@ String serial = wvpRedisMsg.getSerial(); switch (wvpResult.getCode()) { case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); if (playMsgCallback != null) { callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); + playMsgCallback.handler(); } break; case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: @@ -219,36 +214,24 @@ * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 */ private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { - MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); - if (mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaServer == null) { // TODO 鍥炲閿欒 return; } - String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",requestPushStreamMsg.getApp()); - param.put("stream",requestPushStreamMsg.getStream()); - param.put("ssrc", requestPushStreamMsg.getSsrc()); - param.put("dst_url",requestPushStreamMsg.getIp()); - param.put("dst_port", requestPushStreamMsg.getPort()); - param.put("is_udp", is_Udp); - param.put("src_port", requestPushStreamMsg.getSrcPort()); - param.put("pt", requestPushStreamMsg.getPt()); - param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); - param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); + SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg); + + try { + mediaServerService.startSendRtp(mediaServer, null, sendRtpItem); + }catch (ControllerException e) { + return; + } + // 鍥炲娑堟伅 - responsePushStream(jsonObject, fromId, serial); - } - - private void responsePushStream(JSONObject content, String toId, String serial) { - WVPResult<JSONObject> result = new WVPResult<>(); result.setCode(0); - result.setData(content); - WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result)); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); @@ -317,7 +300,7 @@ * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� */ private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); @@ -453,13 +436,8 @@ // TODO 鍥炲閿欒 return; } - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { + if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) { logger.info("[REDIS 鎵ц鍏朵粬骞冲彴鐨勮姹傚仠姝㈡帹娴乚 鎴愬姛锛� {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 鍙戦�乺edis娑堟伅 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index e7cba6b..fcfeff3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -25,7 +24,6 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -57,9 +55,6 @@ @Autowired private IMediaServerService mediaServerService; - @Autowired - private ZLMServerFactory zlmServerFactory; - private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); @@ -88,16 +83,10 @@ } if (push.isSelf()) { // 鍋滄鍚戜笂绾ф帹娴� - String streamId = sendRtpItem.getStream(); - Map<String, Object> param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",streamId); - param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", streamId); + logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", sendRtpItem.getStream()); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); + mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 6401a8c..7f1c869 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; @@ -81,8 +82,8 @@ logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); - MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - if (mediaServerItem == null) { + MediaServer mediaServer = mediaServerService.getDefaultMediaServer(); + if (mediaServer == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer"); } if (stream == null) { @@ -100,13 +101,14 @@ } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode); - if (localPort == 0) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode); + + if (ssrcInfo.getPort() == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); } // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 if (callBack != null) { - Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId()); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� hookSubscribe.addSubscribe(hook, (hookData)->{ @@ -128,8 +130,8 @@ }); } OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo(); - otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherPsSendInfo.setReceivePort(localPort); + otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp()); + otherPsSendInfo.setReceivePort(ssrcInfo.getPort()); otherPsSendInfo.setCallId(callId); otherPsSendInfo.setStream(stream); @@ -138,9 +140,9 @@ if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; // 棰勫垱寤哄彂娴佷俊鎭� - int port = sendRtpPortManager.getNextPort(mediaServerItem); + int port = sendRtpPortManager.getNextPort(mediaServer); - otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp()); otherPsSendInfo.setSendLocalPort(port); // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS); @@ -156,7 +158,7 @@ public void closeRtpServer(String stream) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鏀舵祦] stream->{}", stream); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - zlmServerFactory.closeRtpServer(mediaServerItem,stream); + mediaServerService.closeRTPServer(mediaServerItem, stream); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream; List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); if (!scan.isEmpty()) { @@ -198,7 +200,7 @@ app, stream, callId); - MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServer = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null) { @@ -224,9 +226,10 @@ param.put("src_port", sendInfo.getSendLocalPort()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream); if (streamReady) { - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param); +// mediaServerService.startSendRtp(mediaServer, ); if (jsonObject.getInteger("code") == 0) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, param); redisTemplate.opsForValue().set(key, sendInfo); @@ -238,7 +241,7 @@ }else { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); String uuid = UUID.randomUUID().toString(); - Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId()); dynamicTask.startDelay(uuid, ()->{ logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); redisTemplate.delete(key); @@ -257,7 +260,7 @@ } catch (InterruptedException e) { throw new RuntimeException(e); } - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param); if (jsonObject.getInteger("code") == 0) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, param); redisTemplate.opsForValue().set(key, finalSendInfo); -- Gitblit v1.8.0