From e0344ccf9725fe3d22a90ab11257396071e7f55f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 14 六月 2022 14:37:34 +0800 Subject: [PATCH] 国标级联推送推流 支持多wvp间自动选择与推送 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 1 web_src/src/components/dialog/rtcPlayer.vue | 18 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java | 13 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 6 src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java | 1 src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java | 12 src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java | 83 + src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java | 5 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 8 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java | 10 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java | 4 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java | 15 src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java | 17 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java | 5 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java | 13 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java | 377 ++++++++ src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java | 116 ++ src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java | 14 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java | 108 - src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java | 170 ++++ src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java | 1208 ++++++++++++++++------------ src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java | 9 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 25 src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java | 173 ++++ src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java | 31 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 7 sql/update.sql | 12 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java | 15 37 files changed, 1,831 insertions(+), 664 deletions(-) diff --git a/sql/update.sql b/sql/update.sql index 9c18c26..0abc544 100644 --- a/sql/update.sql +++ b/sql/update.sql @@ -1,12 +1,4 @@ -alter table parent_platform - add startOfflinePush int default 0 null; +alter table stream_push + add serverId varchar(50) not null; -alter table parent_platform - add administrativeDivision varchar(50) not null; - -alter table parent_platform - add catalogGroup int default 1 null; - -alter table device - add ssrcCheck int default 0 null; diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index eb98f6f..57f764b 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -97,4 +97,5 @@ //************************** 绗笁鏂� **************************************** public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; + } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java index 6b45eb5..ec1f9ba 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java @@ -2,7 +2,9 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -47,10 +49,16 @@ private int poolMaxWait; @Autowired - private RedisGPSMsgListener redisGPSMsgListener; + private RedisGpsMsgListener redisGPSMsgListener; @Autowired private RedisAlarmMsgListener redisAlarmMsgListener; + + @Autowired + private RedisStreamMsgListener redisStreamMsgListener; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; @Bean public JedisPool jedisPool() { @@ -98,6 +106,8 @@ container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); + container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); + container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); return container; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c7f6182..41e1af7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -72,6 +72,11 @@ private String mediaServerId; /** + * 浣跨敤鐨勬湇鍔$殑ID + */ + private String serverId; + + /** * invite鐨刢allId */ private String CallId; @@ -259,4 +264,12 @@ public void setOnlyAudio(boolean onlyAudio) { this.onlyAudio = onlyAudio; } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index c416766..66b57fe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -71,7 +71,9 @@ String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null) { // 鏃犳渶鏂颁綅缃笉鍙戦�� - logger.info("鏃犳渶鏂颁綅缃笉鍙戦��"); + if (logger.isDebugEnabled()) { + logger.debug("鏃犳渶鏂颁綅缃笉鍙戦��"); + } // 缁忕含搴﹂兘涓�0涓嶅彂閫� if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index b3d67de..78b8e62 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; @@ -43,7 +45,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); - private String method = "ACK"; + private final String method = "ACK"; @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -77,6 +79,9 @@ @Autowired private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; /** @@ -114,78 +119,41 @@ param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - if (jsonObject == null) { - logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟"); - } else if (jsonObject.getInteger("code") == 0) { - logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"),JSONObject.toJSON(param)); - if (sendRtpItem.isOnlyAudio()) { - // TODO 鍙兘鏄闊冲璁� - }else { - // 鍚戜笂绾у钩鍙� - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{ + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); + }); + }else { + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); } -// if (streamInfo == null) { // 娴佽繕娌′笂鏉ワ紝瀵规柟灏卞洖澶峚ck -// logger.info("鐩戝惉娴佷互绛夊緟娴佷笂绾�1 rtp/{}", sendRtpItem.getStreamId()); -// // 鐩戝惉娴佷笂绾� -// // 娣诲姞璁㈤槄 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", sendRtpItem.getStreamId()); -// subscribeKey.put("regist", true); -// subscribeKey.put("schema", "rtmp"); -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ -// Map<String, Object> param = new HashMap<>(); -// param.put("vhost","__defaultVhost__"); -// param.put("app",json.getString("app")); -// param.put("stream",json.getString("stream")); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url",sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("is_udp", is_Udp); -// param.put("src_port", sendRtpItem.getLocalPort()); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// }); -// }else { -// Map<String, Object> param = new HashMap<>(); -// param.put("vhost","__defaultVhost__"); -// param.put("app",streamInfo.getApp()); -// param.put("stream",streamInfo.getStream()); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url",sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("is_udp", is_Udp); -// param.put("src_port", sendRtpItem.getLocalPort()); -// -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// if (jsonObject.getInteger("code") != 0) { -// logger.info("鐩戝惉娴佷互绛夊緟娴佷笂绾�2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); -// // 鐩戝惉娴佷笂绾� -// // 娣诲姞璁㈤槄 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", streamInfo.getStream()); -// subscribeKey.put("regist", true); -// subscribeKey.put("schema", "rtmp"); -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// }); -// } -// } + } + } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}",jsonObject.getString("msg"),JSONObject.toJSON(param)); + if (sendRtpItem.isOnlyAudio()) { + // TODO 鍙兘鏄闊冲璁� + }else { + // 鍚戜笂绾у钩鍙� + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 7531809..3268880 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -107,13 +107,9 @@ cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); - messageForPushChannel.setType(0); - messageForPushChannel.setGbId(sendRtpItem.getChannelId()); - messageForPushChannel.setApp(sendRtpItem.getApp()); - messageForPushChannel.setStream(sendRtpItem.getStreamId()); - messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java index 0a818ee..b04352a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java @@ -15,7 +15,7 @@ @Component public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private String method = "CANCEL"; + private final String method = "CANCEL"; @Autowired private SIPProcessorObserver sipProcessorObserver; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 75b4114..19908e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -17,10 +17,13 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -50,562 +53,709 @@ @Component public class InviteRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); + private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); - private String method = "INVITE"; + private final String method = "INVITE"; - @Autowired - private SIPCommanderFroPlatform cmderFroPlatform; + @Autowired + private SIPCommanderFroPlatform cmderFroPlatform; - @Autowired - private IVideoManagerStorage storager; + @Autowired + private IVideoManagerStorage storager; - @Autowired - private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamPushService streamPushService; - @Autowired - private DynamicTask dynamicTask; + @Autowired + private IRedisCatchStorage redisCatchStorage; - @Autowired - private SIPCommander cmder; + @Autowired + private DynamicTask dynamicTask; - @Autowired - private IPlayService playService; + @Autowired + private SIPCommander cmder; - @Autowired - private ISIPCommander commander; + @Autowired + private IPlayService playService; - @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private ISIPCommander commander; - @Autowired - private IMediaServerService mediaServerService; + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; - @Autowired - private SIPProcessorObserver sipProcessorObserver; + @Autowired + private IMediaServerService mediaServerService; - @Autowired - private VideoStreamSessionManager sessionManager; + @Autowired + private SIPProcessorObserver sipProcessorObserver; - @Autowired - private UserSetting userSetting; + @Autowired + private VideoStreamSessionManager sessionManager; - @Autowired - private ZLMMediaListManager mediaListManager; + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager mediaListManager; - @Override - public void afterPropertiesSet() throws Exception { - // 娣诲姞娑堟伅澶勭悊鐨勮闃� - sipProcessorObserver.addRequestProcessor(method, this); - } - - /** - * 澶勭悊invite璇锋眰 - * - * @param evt - * 璇锋眰娑堟伅 - */ - @Override - public void process(RequestEvent evt) { - // Invite Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽姹傝棰戞寚浠� - try { - Request request = evt.getRequest(); - SipURI sipURI = (SipURI) request.getRequestURI(); - //浠巗ubject璇诲彇channelId,涓嶅啀浠巖equest-line璇诲彇銆� 鏈変簺骞冲彴request-line鏄钩鍙板浗鏍囩紪鐮侊紝涓嶆槸璁惧鍥芥爣缂栫爜銆� - //String channelId = sipURI.getUser(); - String channelId = SipUtils.getChannelIdFromHeader(request); - String requesterId = SipUtils.getUserIdFromFromHeader(request); - CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - if (requesterId == null || channelId == null) { - logger.info("鏃犳硶浠嶧romHeader鐨凙ddress涓幏鍙栧埌骞冲彴id锛岃繑鍥�400"); - responseAck(evt, Response.BAD_REQUEST); // 鍙傛暟涓嶅叏锛� 鍙�400锛岃姹傞敊璇� - return; - } - - // 鏌ヨ璇锋眰鏄惁鏉ヨ嚜涓婄骇骞冲彴\璁惧 - ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); - if (platform == null) { - inviteFromDeviceHandle(evt, requesterId); - }else { - // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 - DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(channelId); - MediaServerItem mediaServerItem = null; - // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 - if (channel != null && gbStream == null ) { - if (channel.getStatus() == 0) { - logger.info("閫氶亾绂荤嚎锛岃繑鍥�400"); - responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); - return; - } - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 - }else if(channel == null && gbStream != null){ - String mediaServerId = gbStream.getMediaServerId(); - mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { - logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410",gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(evt, Response.GONE); - return; - } - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 - }else if (catalog != null) { - responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 鐩綍涓嶆敮鎸佺偣鎾� - return; - } else { - logger.info("閫氶亾涓嶅瓨鍦紝杩斿洖404"); - responseAck(evt, Response.NOT_FOUND); // 閫氶亾涓嶅瓨鍦紝鍙�404锛岃祫婧愪笉瀛樺湪 - return; - } - // 瑙f瀽sdp娑堟伅, 浣跨敤jainsip 鑷甫鐨剆dp瑙f瀽鏂瑰紡 - String contentString = new String(request.getRawContent()); - - // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎浠ヨВ鏋愩�� - int ssrcIndex = contentString.indexOf("y="); - // 妫�鏌ユ槸鍚︽湁y瀛楁 - String ssrcDefault = "0000000000"; - String ssrc; - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - }else { - ssrc = ssrcDefault; - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } - String sessionName = sdp.getSessionName().getValue(); - - Long startTime = null; - Long stopTime = null; - Instant start = null; - Instant end = null; - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); - TimeField startTimeFiled = (TimeField)timeDescription.getTime(); - startTime = startTimeFiled.getStartTime(); - stopTime = startTimeFiled.getStopTime(); - - start = Instant.ofEpochSecond(startTime); - end = Instant.ofEpochSecond(stopTime); - } - // 鑾峰彇鏀寔鐨勬牸寮� - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 - //String ip = null; - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - //String mediaType = media.getMediaType(); - String protocol = media.getProtocol(); - - // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - // 涓嶆敮鎸乼cp涓诲姩 - responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾� - return; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("涓嶆敮鎸佺殑濯掍綋鏍煎紡锛岃繑鍥�415"); - // 鍥炲涓嶆敮鎸佺殑鏍煎紡 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 涓嶆敮鎸佺殑鏍煎紡锛屽彂415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - - logger.info("[涓婄骇鐐规挱]鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}, 鍦板潃锛歿}:{}锛� ssrc锛歿}", username, channelId, addressStr, port, ssrc); - Device device = null; - // 閫氳繃 channel 鍜� gbStream 鏄惁涓簄ull 鍊煎垽鏂潵婧愭槸鐩存挱娴佸悎閫傚浗鏍� - if (channel != null) { - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - logger.warn("鐐规挱骞冲彴{}鐨勯�氶亾{}鏃舵湭鎵惧埌璁惧淇℃伅", requesterId, channel); - responseAck(evt, Response.SERVER_INTERNAL_ERROR); - return; - } - mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - logger.warn("鏈壘鍒板彲鐢ㄧ殑zlm"); - responseAck(evt, Response.BUSY_HERE); - return; - } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, - mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - responseAck(evt, Response.BUSY_HERE); - return; - } - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK); - - Long finalStartTime = startTime; - Long finalStopTime = stopTime; - ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - String app = responseJSON.getString("app"); - String stream = responseJSON.getString("stream"); - logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream); - // * 0 绛夊緟璁惧鎺ㄦ祦涓婃潵 - // * 1 涓嬬骇宸茬粡鎺ㄦ祦锛岀瓑寰呬笂绾у钩鍙板洖澶峚ck - // * 2 鎺ㄦ祦涓� - sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("s=" + sessionName+"\r\n"); - content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - if ("Playback".equals(sessionName)) { - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); - }else { - content.append("t=0 0\r\n"); - } - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y="+ ssrc + "\r\n"); - content.append("f=\r\n"); - - try { - // 瓒呮椂鏈敹鍒癆ck搴旇鍥炲bye,褰撳墠绛夊緟鏃堕棿涓�10绉� - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ - logger.info("Ack 绛夊緟瓒呮椂"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); - // 鍥炲bye - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - }, 60*1000); - responseSdpAck(evt, content.toString(), platform); - - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }; - SipSubscribe.Event errorEvent = ((event) -> { - // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 - Response response = null; - try { - response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); - ServerTransaction serverTransaction = getServerTransaction(evt); - serverTransaction.sendResponse(response); - if (serverTransaction.getDialog() != null) { - serverTransaction.getDialog().delete(); - } - } catch (ParseException | SipException | InvalidArgumentException e) { - e.printStackTrace(); - } - }); - sendRtpItem.setApp("rtp"); - if ("Playback".equals(sessionName)) { - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), null, result -> { - if (result.getCode() != 0){ - logger.warn("褰曞儚鍥炴斁澶辫触"); - if (result.getEvent() != null) { - errorEvent.response(result.getEvent()); - } - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - try { - responseAck(evt, Response.REQUEST_TIMEOUT); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }else { - if (result.getMediaServerItem() != null) { - hookEvent.response(result.getMediaServerItem(), result.getResponse()); - } - } - }); - }else { - sendRtpItem.setPlayType(InviteStreamType.PLAY); - SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); - if (playTransaction != null) { - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); - if (!streamReady) { - playTransaction = null; - } - } - if (playTransaction == null) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ - logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }, null); - }else { - sendRtpItem.setStreamId(playTransaction.getStream()); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - redisCatchStorage.updateSendRTPSever(sendRtpItem); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("app", sendRtpItem.getApp()); - jsonObject.put("stream", sendRtpItem.getStreamId()); - hookEvent.response(mediaServerItem, jsonObject); - } - } - }else if (gbStream != null) { - - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (!streamReady ) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽惎鐢ㄦ祦鍚庡紑濮嬫帹娴�",gbStream.getApp(), gbStream.getStream()); - responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); - }else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); - return; - } - // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 - logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",gbStream.getApp(), gbStream.getStream()); - MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); - messageForPushChannel.setType(1); - messageForPushChannel.setGbId(gbStream.getGbId()); - messageForPushChannel.setApp(gbStream.getApp()); - messageForPushChannel.setStream(gbStream.getStream()); - // TODO 鑾峰彇浣庤礋杞界殑鑺傜偣 - messageForPushChannel.setMediaServerId(gbStream.getMediaServerId()); - messageForPushChannel.setPlatFormId(platform.getServerGBId()); - messageForPushChannel.setPlatFormName(platform.getName()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 璁剧疆瓒呮椂 - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ - logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); - try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); - responseAck(evt, Response.REQUEST_TIMEOUT); // 瓒呮椂 - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }, userSetting.getPlatformPlayTimeout()); - // 娣诲姞鐩戝惉 - MediaServerItem finalMediaServerItem = mediaServerItem; - int finalPort = port; - boolean finalMediaTransmissionTCP = mediaTransmissionTCP; - Boolean finalTcpActive = tcpActive; - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{ - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, finalMediaTransmissionTCP); - - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - try { - responseAck(evt, Response.BUSY_HERE); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt); - - }); - } - }else { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, - mediaTransmissionTCP); + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; - if (sendRtpItem == null) { - logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); - responseAck(evt, Response.BUSY_HERE); - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); - } + @Override + public void afterPropertiesSet() throws Exception { + // 娣诲姞娑堟伅澶勭悊鐨勮闃� + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 澶勭悊invite璇锋眰 + * + * @param evt 璇锋眰娑堟伅 + */ + @Override + public void process(RequestEvent evt) { + // Invite Request娑堟伅瀹炵幇锛屾娑堟伅涓�鑸负绾ц仈娑堟伅锛屼笂绾х粰涓嬬骇鍙戦�佽姹傝棰戞寚浠� + try { + Request request = evt.getRequest(); + SipURI sipUri = (SipURI) request.getRequestURI(); + //浠巗ubject璇诲彇channelId,涓嶅啀浠巖equest-line璇诲彇銆� 鏈変簺骞冲彴request-line鏄钩鍙板浗鏍囩紪鐮侊紝涓嶆槸璁惧鍥芥爣缂栫爜銆� + //String channelId = sipURI.getUser(); + String channelId = SipUtils.getChannelIdFromHeader(request); + String requesterId = SipUtils.getUserIdFromFromHeader(request); + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + if (requesterId == null || channelId == null) { + logger.info("鏃犳硶浠嶧romHeader鐨凙ddress涓幏鍙栧埌骞冲彴id锛岃繑鍥�400"); + // 鍙傛暟涓嶅叏锛� 鍙�400锛岃姹傞敊璇� + responseAck(evt, Response.BAD_REQUEST); + return; + } + + // 鏌ヨ璇锋眰鏄惁鏉ヨ嚜涓婄骇骞冲彴\璁惧 + ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); + if (platform == null) { + inviteFromDeviceHandle(evt, requesterId); + } else { + // 鏌ヨ骞冲彴涓嬫槸鍚︽湁璇ラ�氶亾 + DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + PlatformCatalog catalog = storager.getCatalog(channelId); + + MediaServerItem mediaServerItem = null; + StreamPushItem streamPushItem = null; + // 涓嶆槸閫氶亾鍙兘鏄洿鎾祦 + if (channel != null && gbStream == null) { + if (channel.getStatus() == 0) { + logger.info("閫氶亾绂荤嚎锛岃繑鍥�400"); + responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); + return; + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 + } else if (channel == null && gbStream != null) { + + String mediaServerId = gbStream.getMediaServerId(); + mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + if ("proxy".equals(gbStream.getStreamType())) { + logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } else { + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { + logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + } + } else { + if ("push".equals(gbStream.getStreamType())) { + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem == null) { + logger.info("[ app={}, stream={} ]鎵句笉鍒皕lm {}锛岃繑鍥�410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + } + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 閫氶亾瀛樺湪锛屽彂181锛屽懠鍙浆鎺ヤ腑 + } else if (catalog != null) { + responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 鐩綍涓嶆敮鎸佺偣鎾� + return; + } else { + logger.info("閫氶亾涓嶅瓨鍦紝杩斿洖404"); + responseAck(evt, Response.NOT_FOUND); // 閫氶亾涓嶅瓨鍦紝鍙�404锛岃祫婧愪笉瀛樺湪 + return; + } + // 瑙f瀽sdp娑堟伅, 浣跨敤jainsip 鑷甫鐨剆dp瑙f瀽鏂瑰紡 + String contentString = new String(request.getRawContent()); + + // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎浠ヨВ鏋愩�� + int ssrcIndex = contentString.indexOf("y="); + // 妫�鏌ユ槸鍚︽湁y瀛楁 + String ssrcDefault = "0000000000"; + String ssrc; + SessionDescription sdp; + if (ssrcIndex >= 0) { + //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + String substring = contentString.substring(0, contentString.indexOf("y=")); + sdp = SdpFactory.getInstance().createSessionDescription(substring); + } else { + ssrc = ssrcDefault; + sdp = SdpFactory.getInstance().createSessionDescription(contentString); + } + String sessionName = sdp.getSessionName().getValue(); + + Long startTime = null; + Long stopTime = null; + Instant start = null; + Instant end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField) timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = Instant.ofEpochSecond(startTime); + end = Instant.ofEpochSecond(stopTime); + } + // 鑾峰彇鏀寔鐨勬牸寮� + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 + //String ip = null; + int port = -1; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("96")) { + port = media.getMediaPort(); + //String mediaType = media.getMediaType(); + String protocol = media.getProtocol(); + + // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + // 涓嶆敮鎸乼cp涓诲姩 + responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 鐩綍涓嶆敮鎸佺偣鎾� + return; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("涓嶆敮鎸佺殑濯掍綋鏍煎紡锛岃繑鍥�415"); + // 鍥炲涓嶆敮鎸佺殑鏍煎紡 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 涓嶆敮鎸佺殑鏍煎紡锛屽彂415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + + logger.info("[涓婄骇鐐规挱]鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}, 鍦板潃锛歿}:{}锛� ssrc锛歿}", username, channelId, addressStr, port, ssrc); + Device device = null; + // 閫氳繃 channel 鍜� gbStream 鏄惁涓簄ull 鍊煎垽鏂潵婧愭槸鐩存挱娴佸悎閫傚浗鏍� + if (channel != null) { + device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); + if (device == null) { + logger.warn("鐐规挱骞冲彴{}鐨勯�氶亾{}鏃舵湭鎵惧埌璁惧淇℃伅", requesterId, channel); + responseAck(evt, Response.SERVER_INTERNAL_ERROR); + return; + } + mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("鏈壘鍒板彲鐢ㄧ殑zlm"); + responseAck(evt, Response.BUSY_HERE); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, + mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + responseAck(evt, Response.BUSY_HERE); + return; + } + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + + Long finalStartTime = startTime; + Long finalStopTime = stopTime; + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { + String app = responseJSON.getString("app"); + String stream = responseJSON.getString("stream"); + logger.info("[涓婄骇鐐规挱]涓嬬骇宸茬粡寮�濮嬫帹娴併�� 鍥炲200OK(SDP)锛� {}/{}", app, stream); + // * 0 绛夊緟璁惧鎺ㄦ祦涓婃潵 + // * 1 涓嬬骇宸茬粡鎺ㄦ祦锛岀瓑寰呬笂绾у钩鍙板洖澶峚ck + // * 2 鎺ㄦ祦涓� + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("s=" + sessionName + "\r\n"); + content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + if ("Playback".equals(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + } else { + content.append("t=0 0\r\n"); + } + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y=" + ssrc + "\r\n"); + content.append("f=\r\n"); + + try { + // 瓒呮椂鏈敹鍒癆ck搴旇鍥炲bye,褰撳墠绛夊緟鏃堕棿涓�10绉� + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 绛夊緟瓒呮椂"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); + // 鍥炲bye + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + }, 60 * 1000); + responseSdpAck(evt, content.toString(), platform); + + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }; + SipSubscribe.Event errorEvent = ((event) -> { + // 鏈煡閿欒銆傜洿鎺ヨ浆鍙戣澶囩偣鎾殑閿欒 + Response response = null; + try { + response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) { + serverTransaction.getDialog().delete(); + } + } catch (ParseException | SipException | InvalidArgumentException e) { + e.printStackTrace(); + } + }); + sendRtpItem.setApp("rtp"); + if ("Playback".equals(sessionName)) { + sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), + DateUtil.formatter.format(end), null, result -> { + if (result.getCode() != 0) { + logger.warn("褰曞儚鍥炴斁澶辫触"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } + } + }); + } else { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (playTransaction != null) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); + if (!streamReady) { + playTransaction = null; + } + } + if (playTransaction == null) { + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { + logger.info("[涓婄骇鐐规挱]瓒呮椂, 鐢ㄦ埛锛歿}锛� 閫氶亾锛歿}", username, channelId); + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + }, null); + } else { + sendRtpItem.setStreamId(playTransaction.getStream()); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + redisCatchStorage.updateSendRTPSever(sendRtpItem); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("app", sendRtpItem.getApp()); + jsonObject.put("stream", sendRtpItem.getStreamId()); + hookEvent.response(mediaServerItem, jsonObject); + } + } + } else if (gbStream != null) { + if (streamPushItem.isStatus()) { + // 鍦ㄧ嚎鐘舵�� + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 涓嶅湪绾� 鎷夎捣 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } + + } + + } catch (SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + logger.warn("sdp瑙f瀽閿欒"); + e.printStackTrace(); + } catch (SdpParseException e) { + e.printStackTrace(); + } catch (SdpException e) { + e.printStackTrace(); + } + } + + /** + * 瀹夋帓鎺ㄦ祦 + */ + + private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + // 鎺ㄦ祦 + if (streamPushItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + // 鑷钩鍙板唴瀹� + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + + if (sendRtpItem == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + responseAck(evt, Response.BUSY_HERE); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + } else { + // 涓嶅湪绾� 鎷夎捣 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } + + /** + * 閫氱煡娴佷笂绾� + */ + private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + if ("proxy".equals(gbStream.getStreamType())) { + // TODO 鎺у埗鍚敤浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽惎鐢ㄦ祦鍚庡紑濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + } else if ("push".equals(gbStream.getStreamType())) { + if (!platform.isStartOfflinePush()) { + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); + return; + } + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�", gbStream.getApp(), gbStream.getStream()); + + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), + platform.getName(), null, gbStream.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", gbStream.getApp(), gbStream.getStream()); + try { + mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); + responseAck(evt, Response.REQUEST_TIMEOUT); // 瓒呮椂 + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }, userSetting.getPlatformPlayTimeout()); + // 娣诲姞鐩戝惉 + int finalPort = port; + Boolean finalTcpActive = tcpActive; + + // 娣诲姞鍦ㄦ湰鏈轰笂绾跨殑閫氱煡 + mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { + dynamicTask.stop(callIdHeader.getCallId()); + if (serverId.equals(userSetting.getServerId())) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, + app, stream, channelId, mediaTransmissionTCP); + + if (sendRtpItem == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + } + if (finalTcpActive != null) { + sendRtpItem.setTcpActive(finalTcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + } else { + // 鍏朵粬骞冲彴鍐呭 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }); + } + } + + /** + * 鏉ヨ嚜鍏朵粬wvp鐨勬帹娴� + */ + private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) { + logger.info("[绾ц仈鐐规挱]鐩存挱娴佹潵鑷叾浠栧钩鍙帮紝鍙戦�乺edis娑堟伅"); + // 鍙戦�乺edis娑堟伅 + redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), + streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, + channelId, mediaTransmissionTCP, null, responseSendItemMsg -> { + SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); + if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { + logger.warn("鏈嶅姟鍣ㄧ鍙h祫婧愪笉瓒�"); + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + } + // 鏀跺埌sendItem + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 鍐欏叆redis锛� 瓒呮椂鏃跺洖澶� + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt); + }, (wvpResult) -> { + try { + // 閿欒 + if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { + // 绂荤嚎 + // 鏌ヨ鏄惁鍦ㄦ湰鏈轰笂绾夸簡 + StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); + if (currentStreamPushItem.isStatus()) { + // 鍦ㄧ嚎鐘舵�� + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + + } else { + // 涓嶅湪绾� 鎷夎捣 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } catch (SipException e) { + throw new RuntimeException(e); + } - } + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + }); + } - } + public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - } catch (SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); - logger.warn("sdp瑙f瀽閿欒"); - e.printStackTrace(); - } catch (SdpParseException e) { - e.printStackTrace(); - } catch (SdpException e) { - e.printStackTrace(); - } - } + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + if (sendRtpItem.isTcp()) { + content.append("a=connection:new\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + } else { + content.append("a=setup:passive\r\n"); + } + } + content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); + content.append("f=\r\n"); - public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){ + try { + responseSdpAck(evt, content.toString(), platform); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ sendRtpItem.getChannelId() +" 0 0 IN IP4 "+ mediaServerItem.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - if (sendRtpItem.isTcp()) { - content.append("a=connection:new\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - }else { - content.append("a=setup:passive\r\n"); - } - } - content.append("y="+ sendRtpItem.getSsrc() + "\r\n"); - content.append("f=\r\n"); + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { - try { - responseSdpAck(evt, content.toString(), platform); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - } + // 闈炰笂绾у钩鍙拌姹傦紝鏌ヨ鏄惁璁惧璇锋眰锛堥�氬父涓烘帴鏀惰闊冲箍鎾殑璁惧锛� + Device device = redisCatchStorage.getDevice(requesterId); + Request request = evt.getRequest(); + if (device != null) { + logger.info("鏀跺埌璁惧" + requesterId + "鐨勮闊冲箍鎾璉nvite璇锋眰"); + responseAck(evt, Response.TRYING); - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + String contentString = new String(request.getRawContent()); + // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎绉婚櫎浠ヨВ鏋愩�� + String substring = contentString; + String ssrc = "0000000404"; + int ssrcIndex = contentString.indexOf("y="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + ssrcIndex = substring.indexOf("f="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + } + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - // 闈炰笂绾у钩鍙拌姹傦紝鏌ヨ鏄惁璁惧璇锋眰锛堥�氬父涓烘帴鏀惰闊冲箍鎾殑璁惧锛� - Device device = redisCatchStorage.getDevice(requesterId); - Request request = evt.getRequest(); - if (device != null) { - logger.info("鏀跺埌璁惧" + requesterId + "鐨勮闊冲箍鎾璉nvite璇锋眰"); - responseAck(evt, Response.TRYING); + // 鑾峰彇鏀寔鐨勬牸寮� + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); - String contentString = new String(request.getRawContent()); - // jainSip涓嶆敮鎸亂=瀛楁锛� 绉婚櫎绉婚櫎浠ヨВ鏋愩�� - String substring = contentString; - String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("涓嶆敮鎸佺殑濯掍綋鏍煎紡锛岃繑鍥�415"); + // 鍥炲涓嶆敮鎸佺殑鏍煎紡 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 涓嶆敮鎸佺殑鏍煎紡锛屽彂415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}", username, addressStr, port, ssrc); - // 鑾峰彇鏀寔鐨勬牸寮� - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 鏌ョ湅鏄惁鏀寔PS 璐熻浇96 - int port = -1; - //boolean recvonly = false; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 鍖哄垎TCP鍙戞祦杩樻槸udp锛� 褰撳墠榛樿udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("涓嶆敮鎸佺殑濯掍綋鏍煎紡锛岃繑鍥�415"); - // 鍥炲涓嶆敮鎸佺殑鏍煎紡 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 涓嶆敮鎸佺殑鏍煎紡锛屽彂415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - logger.info("璁惧{}璇锋眰璇煶娴侊紝鍦板潃锛歿}:{}锛宻src锛歿}", username, addressStr, port, ssrc); - - } else { - logger.warn("鏉ヨ嚜鏃犳晥璁惧/骞冲彴鐨勮姹�"); - responseAck(evt, Response.BAD_REQUEST); - } - } + } else { + logger.warn("鏉ヨ嚜鏃犳晥璁惧/骞冲彴鐨勮姹�"); + responseAck(evt, Response.BAD_REQUEST); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index cebc2ed..e914339 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -41,7 +41,7 @@ private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); - public String method = "REGISTER"; + public final String method = "REGISTER"; @Autowired private SipConfig sipConfig; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 87adc3e..57e8045 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -40,9 +40,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); - public static volatile List<String> threadNameList = new ArrayList(); private final String cmdType = "RecordInfo"; - private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java index 64933b8..ff63fad 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java @@ -17,7 +17,7 @@ @Component public class ByeResponseProcessor extends SIPResponseProcessorAbstract { - private String method = "BYE"; + private final String method = "BYE"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java index 80d7e2b..775beeb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java @@ -17,7 +17,7 @@ @Component public class CancelResponseProcessor extends SIPResponseProcessorAbstract { - private String method = "CANCEL"; + private final String method = "CANCEL"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index c81aabb..89958e9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -31,7 +31,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); - private String method = "INVITE"; + private final String method = "INVITE"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index 02f5e1d..f3a9f65 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -27,7 +27,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); - private String method = "REGISTER"; + private final String method = "REGISTER"; @Autowired private ISIPCommanderForPlatform sipCommanderForPlatform; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 528609d..4ea9cf1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -397,21 +397,22 @@ if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - streamPushItem = zlmMediaListManager.addPush(item); + item.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(item); } - List<GbStream> gbStreams = new ArrayList<>(); - if (streamPushItem == null || streamPushItem.getGbId() == null) { - GbStream gbStream = storager.getGbStream(app, streamId); - gbStreams.add(gbStream); - }else { - if (streamPushItem.getGbId() != null) { - gbStreams.add(streamPushItem); - } - } - if (gbStreams.size() > 0) { +// List<GbStream> gbStreams = new ArrayList<>(); +// if (streamPushItem == null || streamPushItem.getGbId() == null) { +// GbStream gbStream = storager.getGbStream(app, streamId); +// gbStreams.add(gbStream); +// }else { +// if (streamPushItem.getGbId() != null) { +// gbStreams.add(streamPushItem); +// } +// } +// if (gbStreams.size() > 0) { // eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); - } +// } }else { // 鍏煎娴佹敞閿�鏃剁被鍨嬩粠redis璁板綍鑾峰彇 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 9beac16..959c06e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -24,6 +24,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +/** + * @author lin + */ @Component public class ZLMMediaListManager { @@ -147,7 +150,6 @@ } } } - // StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream()); List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); if (gbStreamList != null && gbStreamList.size() == 1) { transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); @@ -162,12 +164,11 @@ } if (transform != null) { if (channelOnlineEvents.get(transform.getGbId()) != null) { - channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream()); + channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); channelOnlineEvents.remove(transform.getGbId()); } } } - storager.updateMedia(transform); return transform; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index bd3dcb1..210aa72 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; @@ -19,6 +20,9 @@ @Autowired private ZLMRESTfulUtils zlmresTfulUtils; + + @Autowired + private UserSetting userSetting; private int[] portRangeArray = new int[2]; @@ -197,6 +201,7 @@ sendRtpItem.setTcp(tcp); sendRtpItem.setApp("rtp"); sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } @@ -238,6 +243,7 @@ sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java index ba7daec..21e6ca0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.media.zlm.dto; +/** + * @author lin + */ public interface ChannelOnlineEvent { - void run(String app, String stream); + void run(String app, String stream, String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java index ad158ec..8abac5b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java @@ -61,9 +61,14 @@ private String originUrl; /** - * 鏈嶅姟鍣╥d + * 娴佸獟浣撴湇鍔″櫒id */ private String mediaServerId; + + /** + * 鏈嶅姟鍣╥d + */ + private String severId; /** * GMT unix绯荤粺鏃堕棿鎴筹紝鍗曚綅绉� @@ -414,4 +419,12 @@ public void setStreamInfo(StreamInfo streamInfo) { this.streamInfo = streamInfo; } + + public String getSeverId() { + return severId; + } + + public void setSeverId(String severId) { + this.severId = severId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java index 81c9c76..ceb48b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java @@ -81,6 +81,11 @@ */ private String mediaServerId; + /** + * 浣跨敤鐨勬湇鍔D + */ + private String serverId; + public String getVhost() { return vhost; } @@ -219,5 +224,13 @@ public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java index 5bdd9f5..9d15c1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java @@ -23,7 +23,6 @@ private IVideoManagerStorage storager; - @Scheduled(fixedRate = 30 * 1000) //姣�30绉掓墽琛屼竴娆� public void execute(){ List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 3ab1b80..6dcc515 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.service.bean; +import java.util.stream.Stream; + /** * 褰撲笂绾у钩鍙� + * @author lin */ public class MessageForPushChannel { /** @@ -45,6 +48,20 @@ */ private String mediaServerId; + public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId, + String platFormId, String platFormName, String serverId, + String mediaServerId){ + MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); + messageForPushChannel.setType(type); + messageForPushChannel.setGbId(gbId); + messageForPushChannel.setApp(app); + messageForPushChannel.setStream(stream); + messageForPushChannel.setMediaServerId(mediaServerId); + messageForPushChannel.setPlatFormId(platFormId); + messageForPushChannel.setPlatFormName(platFormName); + return messageForPushChannel; + } + public int getType() { return type; diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java new file mode 100644 index 0000000..5827d01 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java @@ -0,0 +1,170 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * redis娑堟伅锛氳姹備笅绾ф帹閫佹祦淇℃伅 + * @author lin + */ +public class RequestPushStreamMsg { + + + /** + * 涓嬬骇鏈嶅姟ID + */ + private String mediaServerId; + + /** + * 娴両D + */ + private String app; + + /** + * 搴旂敤鍚� + */ + private String stream; + + /** + * 鐩爣IP + */ + private String ip; + + /** + * 鐩爣绔彛 + */ + private int port; + + /** + * ssrc + */ + private String ssrc; + + /** + * 鏄惁浣跨敤TCP鏂瑰紡 + */ + private boolean tcp; + + /** + * 鏈湴浣跨敤鐨勭鍙� + */ + private int srcPort; + + /** + * 鍙戦�佹椂锛宺tp鐨刾t锛坲int8_t锛�,涓嶄紶鏃堕粯璁や负96 + */ + private int pt; + + /** + * 鍙戦�佹椂锛宺tp鐨勮礋杞界被鍨嬨�備负true鏃讹紝璐熻浇涓簆s锛涗负false鏃讹紝涓篹s锛� + */ + private boolean ps; + + /** + * 鏄惁鍙湁闊抽 + */ + private boolean onlyAudio; + + + public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc, + boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) { + RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg(); + requestPushStreamMsg.setMediaServerId(mediaServerId); + requestPushStreamMsg.setApp(app); + requestPushStreamMsg.setStream(stream); + requestPushStreamMsg.setIp(ip); + requestPushStreamMsg.setPort(port); + requestPushStreamMsg.setSsrc(ssrc); + requestPushStreamMsg.setTcp(tcp); + requestPushStreamMsg.setSrcPort(srcPort); + requestPushStreamMsg.setPt(pt); + requestPushStreamMsg.setPs(ps); + requestPushStreamMsg.setOnlyAudio(onlyAudio); + return requestPushStreamMsg; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public boolean isTcp() { + return tcp; + } + + public void setTcp(boolean tcp) { + this.tcp = tcp; + } + + public int getSrcPort() { + return srcPort; + } + + public void setSrcPort(int srcPort) { + this.srcPort = srcPort; + } + + public int getPt() { + return pt; + } + + public void setPt(int pt) { + this.pt = pt; + } + + public boolean isPs() { + return ps; + } + + public void setPs(boolean ps) { + this.ps = ps; + } + + public boolean isOnlyAudio() { + return onlyAudio; + } + + public void setOnlyAudio(boolean onlyAudio) { + this.onlyAudio = onlyAudio; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java new file mode 100644 index 0000000..66689fa --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java @@ -0,0 +1,173 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * redis娑堟伅锛氳姹備笅绾у洖澶嶆帹閫佷俊鎭� + * @author lin + */ +public class RequestSendItemMsg { + + /** + * 涓嬬骇鏈嶅姟ID + */ + private String serverId; + + /** + * 涓嬬骇鏈嶅姟ID + */ + private String mediaServerId; + + /** + * 娴両D + */ + private String app; + + /** + * 搴旂敤鍚� + */ + private String stream; + + /** + * 鐩爣IP + */ + private String ip; + + /** + * 鐩爣绔彛 + */ + private int port; + + /** + * ssrc + */ + private String ssrc; + + /** + * 骞冲彴鍥芥爣缂栧彿 + */ + private String platformId; + + /** + * 骞冲彴鍚嶇О + */ + private String platformName; + + /** + * 閫氶亾ID + */ + private String channelId; + + + /** + * 鏄惁浣跨敤TCP + */ + private Boolean isTcp; + + + + + public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port, + String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) { + RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg(); + requestSendItemMsg.setServerId(serverId); + requestSendItemMsg.setMediaServerId(mediaServerId); + requestSendItemMsg.setApp(app); + requestSendItemMsg.setStream(stream); + requestSendItemMsg.setIp(ip); + requestSendItemMsg.setPort(port); + requestSendItemMsg.setSsrc(ssrc); + requestSendItemMsg.setPlatformId(platformId); + requestSendItemMsg.setPlatformName(platformName); + requestSendItemMsg.setChannelId(channelId); + requestSendItemMsg.setTcp(isTcp); + + return requestSendItemMsg; + } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public Boolean getTcp() { + return isTcp; + } + + public void setTcp(Boolean tcp) { + isTcp = tcp; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java new file mode 100644 index 0000000..501621b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java @@ -0,0 +1,31 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +/** + * redis娑堟伅锛氫笅绾у洖澶嶆帹閫佷俊鎭� + * @author lin + */ +public class ResponseSendItemMsg { + + private SendRtpItem sendRtpItem; + + private MediaServerItem mediaServerItem; + + public SendRtpItem getSendRtpItem() { + return sendRtpItem; + } + + public void setSendRtpItem(SendRtpItem sendRtpItem) { + this.sendRtpItem = sendRtpItem; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java new file mode 100644 index 0000000..12d79cb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java @@ -0,0 +1,116 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * @author lin + */ +public class WvpRedisMsg { + + public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){ + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setType(type); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + private String fromId; + + private String toId; + /** + * req 璇锋眰, res 鍥炲 + */ + private String type; + private String cmd; + + /** + * 娑堟伅鐨処D + */ + private String serial; + private Object content; + + private final static String requestTag = "req"; + private final static String responseTag = "res"; + + public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(requestTag); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + public static WvpRedisMsg getResponseInstance() { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(responseTag); + return wvpRedisMsg; + } + + public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(responseTag); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + public static boolean isRequest(WvpRedisMsg wvpRedisMsg) { + return requestTag.equals(wvpRedisMsg.getType()); + } + + public String getSerial() { + return serial; + } + + public void setSerial(String serial) { + this.serial = serial; + } + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getCmd() { + return cmd; + } + + public void setCmd(String cmd) { + this.cmd = cmd; + } + + public Object getContent() { + return content; + } + + public void setContent(Object content) { + this.content = content; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java new file mode 100644 index 0000000..cb11886 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java @@ -0,0 +1,12 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * @author lin + */ + +public class WvpRedisMsgCmd { + + public static final String GET_SEND_ITEM = "GetSendItem"; + public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java new file mode 100644 index 0000000..638ea41 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java @@ -0,0 +1,377 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * 鐩戝惉涓嬬骇鍙戦�佹帹閫佷俊鎭紝骞跺彂閫佸浗鏍囨帹娴佹秷鎭笂绾� + * @author lin + */ +@Component +public class RedisGbPlayMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); + + public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; + + /** + * 娴佸獟浣撲笉瀛樺湪鐨勯敊璇帥 + */ + public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; + + /** + * 绂荤嚎鐨勯敊璇帥 + */ + public static final int ERROR_CODE_OFFLINE = -2; + + /** + * 瓒呮椂鐨勯敊璇帥 + */ + public static final int ERROR_CODE_TIMEOUT = -3; + + private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>(); + private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisUtil redis; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private ZLMMediaListManager mediaListManager; + + @Autowired + private ZLMHttpHookSubscribe subscribe; + + + public interface PlayMsgCallback{ + void handler(ResponseSendItemMsg responseSendItemMsg); + } + + public interface PlayMsgCallbackForStartSendRtpStream{ + void handler(JSONObject jsonObject); + } + + public interface PlayMsgErrorCallback{ + void handler(WVPResult wvpResult); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { + return; + } + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { + logger.info("[鏀跺埌REDIS閫氱煡] 璇锋眰锛� {}", new String(message.getBody())); + + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + default: + break; + } + + }else { + logger.info("[鏀跺埌REDIS閫氱煡] 鍥炲锛� {}", new String(message.getBody())); + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + + String key = wvpRedisMsg.getSerial(); + switch (content.getCode()) { + case 0: + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); + PlayMsgCallback playMsgCallback = callbacks.get(key); + if (playMsgCallback != null) { + callbacksForError.remove(key); + playMsgCallback.handler(responseSendItemMsg); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); + if (errorCallback != null) { + callbacks.remove(key); + errorCallback.handler(content); + } + break; + default: + break; + } + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + String serial = wvpRedisMsg.getSerial(); + switch (wvpResult.getCode()) { + case 0: + JSONObject jsonObject = (JSONObject)wvpResult.getData(); + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); + if (playMsgCallback != null) { + callbacksForError.remove(serial); + playMsgCallback.handler(jsonObject); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); + if (errorCallback != null) { + callbacks.remove(serial); + errorCallback.handler(wvpResult); + } + break; + default: + break; + } + break; + default: + break; + } + } + + + + + } + + /** + * 澶勭悊鏀跺埌鐨勮姹傛帹娴佺殑璇锋眰 + */ + private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { + MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaInfo == null) { + // TODO 鍥炲閿欒 + return; + } + String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",requestPushStreamMsg.getApp()); + param.put("stream",requestPushStreamMsg.getStream()); + param.put("ssrc", requestPushStreamMsg.getSsrc()); + param.put("dst_url",requestPushStreamMsg.getIp()); + param.put("dst_port", requestPushStreamMsg.getPort()); + param.put("is_udp", is_Udp); + param.put("src_port", requestPushStreamMsg.getSrcPort()); + param.put("pt", requestPushStreamMsg.getPt()); + param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); + param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + // 鍥炲娑堟伅 + responsePushStream(jsonObject, fromId, serial); + } + + private void responsePushStream(JSONObject content, String toId, String serial) { + + WVPResult<JSONObject> result = new WVPResult<>(); + result.setCode(0); + result.setData(content); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 澶勭悊鏀跺埌鐨勮姹俿endItem鐨勮姹� + */ + private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { + MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[鍥炲鎺ㄦ祦淇℃伅] 娴佸獟浣搟}涓嶅瓨鍦� ", content.getMediaServerId()); + + WVPResult<SendRtpItem> result = new WVPResult<>(); + result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); + result.setMsg("娴佸獟浣撲笉瀛樺湪"); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + return; + } + // 纭畾娴佹槸鍚﹀湪绾� + boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + if (streamReady) { + logger.info("[鍥炲鎺ㄦ祦淇℃伅] {}/{}", content.getApp(), content.getStream()); + responseSendItem(mediaServerItem, content, toId, serial); + }else { + // 娴佸凡缁忕绾� + // 鍙戦�乺edis娑堟伅浠ヤ娇璁惧涓婄嚎 + logger.info("[ app={}, stream={} ]閫氶亾绂荤嚎锛屽彂閫乺edis淇℃伅鎺у埗璁惧寮�濮嬫帹娴�",content.getApp(), content.getStream()); + + String taskKey = UUID.randomUUID().toString(); + // 璁剧疆瓒呮椂 + dynamicTask.startDelay(taskKey, ()->{ + logger.info("[ app={}, stream={} ] 绛夊緟璁惧寮�濮嬫帹娴佽秴鏃�", content.getApp(), content.getStream()); + WVPResult<SendRtpItem> result = new WVPResult<>(); + result.setCode(ERROR_CODE_TIMEOUT); + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + }, userSetting.getPlatformPlayTimeout()); + + // 娣诲姞璁㈤槄 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", content.getApp()); + subscribeKey.put("stream", content.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + dynamicTask.stop(taskKey); + responseSendItem(mediaServerItem, content, toId, serial); + }); + + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), + content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), + content.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + + } + } + + /** + * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘� + */ + private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + content.getPort(), content.getSsrc(), content.getPlatformId(), + content.getApp(), content.getStream(), content.getChannelId(), + content.getTcp()); + + WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); + result.setCode(0); + ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); + responseSendItemMsg.setSendRtpItem(sendRtpItem); + responseSendItemMsg.setMediaServerItem(mediaServerItem); + result.setData(responseSendItemMsg); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 鍙戦�佹秷鎭姹備笅绾х敓鎴愭帹娴佷俊鎭� + * @param serverId 涓嬬骇鏈嶅姟ID + * @param app 搴旂敤鍚� + * @param stream 娴両D + * @param ip 鐩爣IP + * @param port 鐩爣绔彛 + * @param ssrc ssrc + * @param platformId 骞冲彴鍥芥爣缂栧彿 + * @param channelId 閫氶亾ID + * @param isTcp 鏄惁浣跨敤TCP + * @param callback 寰楀埌淇℃伅鐨勫洖璋� + */ + public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, + String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { + RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( + serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); + requestSendItemMsg.setServerId(serverId); + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, + key, requestSendItemMsg); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject); + callbacks.put(key, callback); + callbacksForError.put(key, errorCallback); + dynamicTask.startDelay(key, ()->{ + callbacks.remove(key); + callbacksForError.remove(key); + WVPResult<Object> wvpResult = new WVPResult<>(); + wvpResult.setCode(ERROR_CODE_TIMEOUT); + wvpResult.setMsg("timeout"); + errorCallback.handler(wvpResult); + }, userSetting.getPlatformPlayTimeout()); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 鍙戦�佽姹傛帹娴佺殑娑堟伅 + * @param param 鎺ㄦ祦鍙傛暟 + * @param callback 鍥炶皟 + */ + public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject); + dynamicTask.startDelay(key, ()->{ + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }, userSetting.getPlatformPlayTimeout()); + callbacksForStartSendRtpStream.put(key, callback); + callbacksForError.put(key, (wvpResult)->{ + logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] 澶辫触: {}", wvpResult.getMsg()); + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java similarity index 66% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java index c688d13..238aafd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -10,17 +11,23 @@ import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; +/** + * 鎺ユ敹鏉ヨ嚜redis鐨凣PS鏇存柊閫氱煡 + * @author lin + */ @Component -public class RedisGPSMsgListener implements MessageListener { +public class RedisGpsMsgListener implements MessageListener { - private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class); + private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); @Autowired private IRedisCatchStorage redisCatchStorage; @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凣PS閫氱煡锛� {}", new String(message.getBody())); + public void onMessage(@NotNull Message message, byte[] bytes) { + if (logger.isDebugEnabled()) { + logger.debug("鏀跺埌鏉ヨ嚜REDIS鐨凣PS閫氱煡锛� {}", new String(message.getBody())); + } GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java new file mode 100644 index 0000000..07fffdc --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java @@ -0,0 +1,83 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + + +/** + * @author lin + */ +@Component +public class RedisStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); + + @Autowired + private ISIPCommander commander; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private IVideoManagerStorage storage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + @Override + public void onMessage(Message message, byte[] bytes) { + + JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class); + if (steamMsgJson == null) { + logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触"); + return; + } + String serverId = steamMsgJson.getString("serverId"); + + if (userSetting.getServerId().equals(serverId)) { + // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 + return; + } + logger.info("[REDIS閫氱煡] 娴佸彉鍖栵細 {}", new String(message.getBody())); + String app = steamMsgJson.getString("app"); + String stream = steamMsgJson.getString("stream"); + boolean register = steamMsgJson.getBoolean("register"); + String mediaServerId = steamMsgJson.getString("mediaServerId"); + MediaItem mediaItem = new MediaItem(); + mediaItem.setSeverId(serverId); + mediaItem.setApp(app); + mediaItem.setStream(stream); + mediaItem.setRegist(register); + mediaItem.setMediaServerId(mediaServerId); + mediaItem.setCreateStamp(System.currentTimeMillis()/1000); + mediaItem.setAliveSecond(0L); + mediaItem.setTotalReaderCount("0"); + mediaItem.setOriginType(0); + mediaItem.setOriginTypeStr("0"); + mediaItem.setOriginTypeStr("unknown"); + + zlmMediaListManager.addPush(mediaItem); + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index d710dad..1e00faa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -107,6 +107,7 @@ streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); streamPushItem.setVhost(item.getVhost()); + streamPushItem.setServerId(item.getSeverId()); return streamPushItem; } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index d94669b..6b86280 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -357,6 +357,15 @@ /** + * 鑾峰彇浣嗕釜鎺ㄦ祦 + * @param app + * @param stream + * @return + */ + StreamPushItem getMedia(String app, String stream); + + + /** * 娓呯┖鎺ㄦ祦鍒楄〃 */ void clearMediaList(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 909d3a8..ebd3478 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -14,9 +14,9 @@ public interface StreamPushMapper { @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + - "createStamp, aliveSecond, mediaServerId) VALUES" + + "createStamp, aliveSecond, mediaServerId, serverId) VALUES" + "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " + - "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )") + "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )") int add(StreamPushItem streamPushItem); @Update("UPDATE stream_push " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 39daeda..5377e23 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -587,11 +587,11 @@ String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*"; List<GPSMsgInfo> result = new ArrayList<>(); List<Object> keys = redis.scan(scanKey); - for (int i = 0; i < keys.size(); i++) { - String key = (String) keys.get(i); + for (Object o : keys) { + String key = (String) o; GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); if (!gpsMsgInfo.isStored()) { // 鍙彇娌℃湁瀛樿繃寰� - result.add((GPSMsgInfo)redis.get(key)); + result.add((GPSMsgInfo) redis.get(key)); } } @@ -667,7 +667,7 @@ @Override public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis 鎺ㄦ祦琚姹傞�氱煡] {}: {}-{}", key, msg.getApp(), msg.getStream()); + logger.info("[redis 鎺ㄦ祦琚姹傞�氱煡] {}: {}/{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 338bb77..ac870f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -885,6 +885,11 @@ } @Override + public StreamPushItem getMedia(String app, String stream) { + return streamPushMapper.selectOne(app, stream); + } + + @Override public void clearMediaList() { streamPushMapper.clear(); } diff --git a/web_src/src/components/dialog/rtcPlayer.vue b/web_src/src/components/dialog/rtcPlayer.vue index 75c18f3..4737849 100644 --- a/web_src/src/components/dialog/rtcPlayer.vue +++ b/web_src/src/components/dialog/rtcPlayer.vue @@ -7,11 +7,11 @@ </template> <script> +let webrtcPlayer = null; export default { name: 'rtcPlayer', data() { return { - webrtcPlayer: null, timer: null }; }, @@ -35,7 +35,7 @@ }, methods: { play: function (url) { - this.webrtcPlayer = new ZLMRTCClient.Endpoint({ + webrtcPlayer = new ZLMRTCClient.Endpoint({ element: document.getElementById('webRtcPlayerBox'),// video 鏍囩 debug: true,// 鏄惁鎵撳嵃鏃ュ織 zlmsdpUrl: url,//娴佸湴鍧� @@ -45,17 +45,17 @@ videoEnable: false, recvOnly: true, }) - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 鍗忓晢鍑洪敊 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 鍗忓晢鍑洪敊 console.error('ICE 鍗忓晢鍑洪敊') this.eventcallbacK("ICE ERROR", "ICE 鍗忓晢鍑洪敊") }); - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//鑾峰彇鍒颁簡杩滅娴侊紝鍙互鎾斁 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//鑾峰彇鍒颁簡杩滅娴侊紝鍙互鎾斁 console.error('鎾斁鎴愬姛',e.streams) this.eventcallbacK("playing", "鎾斁鎴愬姛") }); - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 浜ゆ崲澶辫触 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 浜ゆ崲澶辫触 console.error('offer anwser 浜ゆ崲澶辫触',e) this.eventcallbacK("OFFER ANSWER ERROR ", "offer anwser 浜ゆ崲澶辫触") if (e.code ==-400 && e.msg=="娴佷笉瀛樺湪"){ @@ -68,7 +68,7 @@ } }); - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 鑾峰彇鍒颁簡鏈湴娴� + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 鑾峰彇鍒颁簡鏈湴娴� // document.getElementById('selfVideo').srcObject=s; this.eventcallbacK("LOCAL STREAM", "鑾峰彇鍒颁簡鏈湴娴�") @@ -76,9 +76,9 @@ }, pause: function () { - if (this.webrtcPlayer != null) { - this.webrtcPlayer.close(); - this.webrtcPlayer = null; + if (webrtcPlayer != null) { + webrtcPlayer.close(); + webrtcPlayer = null; } }, -- Gitblit v1.8.0