src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -74,6 +74,7 @@ public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:"; src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,9 +29,6 @@ private RedisAlarmMsgListener redisAlarmMsgListener; @Autowired private RedisStreamMsgListener redisStreamMsgListener; @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired @@ -52,6 +49,9 @@ @Autowired private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener; @Autowired private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -67,14 +67,14 @@ container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM)); container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM)); container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED)); container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE)); return container; } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -132,6 +132,11 @@ */ private String receiveStream; /** * 上级的点播类型 */ private String sessionName; public String getIp() { return ip; } @@ -332,6 +337,14 @@ this.localIp = localIp; } public String getSessionName() { return sessionName; } public void setSessionName(String sessionName) { this.sessionName = sessionName; } @Override public String toString() { return "SendRtpItem{" + @@ -347,7 +360,7 @@ ", stream='" + stream + '\'' + ", tcp=" + tcp + ", tcpActive=" + tcpActive + ", localIp=" + localIp + ", localIp='" + localIp + '\'' + ", localPort=" + localPort + ", mediaServerId='" + mediaServerId + '\'' + ", serverId='" + serverId + '\'' + @@ -360,6 +373,7 @@ ", rtcp=" + rtcp + ", playType=" + playType + ", receiveStream='" + receiveStream + '\'' + ", sessionName='" + sessionName + '\'' + '}'; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; @@ -78,9 +77,6 @@ private DynamicTask dynamicTask; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired private IPlayService playService; @@ -117,13 +113,7 @@ if (parentPlatform != null) { Map<String, Object> param = getSendRtpParam(sendRtpItem); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); }); redisCatchStorage.sendStartSendRtp(sendRtpItem); } else { JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -98,8 +97,6 @@ @Autowired private IStreamPushService pushService; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Override public void afterPropertiesSet() throws Exception { @@ -142,7 +139,7 @@ ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); // redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -122,7 +122,7 @@ private UserSetting userSetting; @Autowired private ZLMMediaListManager mediaListManager; private RedisPlatformPushStreamOnlineLister mediaListManager; @Autowired private SipConfig config; @@ -568,19 +568,16 @@ } } else if (gbStream != null) { String ssrc; if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); }else { ssrc = gb28181Sdp.getSsrc(); } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setTcpActive(tcpActive); if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); } if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setTcp(mediaTransmissionTCP); sendRtpItem.setRtcp(platform.isRtcp()); sendRtpItem.setSsrc(ssrc); sendRtpItem.setPlatformName(platform.getName()); sendRtpItem.setPlatformId(platform.getServerGBId()); sendRtpItem.setMediaServerId(mediaServerItem.getId()); @@ -593,37 +590,48 @@ sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setOnlyAudio(false); sendRtpItem.setPlayType(InviteStreamType.PUSH); sendRtpItem.setStatus(0); sendRtpItem.setSessionName(sessionName); if ("push".equals(gbStream.getStreamType())) { if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); sendRtpItem.setServerId(pushListItem.getSeverId()); if (pushListItem != null) { sendRtpItem.setServerId(pushListItem.getSeverId()); StreamPushItem transform = streamPushService.transform(pushListItem); transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); // 推流状态 pushStream(sendRtpItem, mediaServerItem, platform, request); sendPushStream(sendRtpItem, mediaServerItem, platform, request); }else { // 未推流 拉起 if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 try { logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } return; } notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); } if (proxyByAppAndStream.isStatus()) { pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); sendProxyStream(sendRtpItem, mediaServerItem, platform, request); } else { //开启代理拉流 notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } } } } @@ -649,33 +657,23 @@ /** * 安排推流 */ private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady != null && streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); int localPort = sendRtpPortManager.getNextPort(mediaServerItem); if (localPort == 0) { logger.warn("服务器端口资源不足"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); } return; } return; } if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); } sendRtpItem.setPlayType(InviteStreamType.PUSH); sendRtpItem.setPlayType(InviteStreamType.PROXY); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setFromTag(request.getFromTag()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); @@ -683,12 +681,10 @@ sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); } } private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // 推流 if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -712,6 +708,11 @@ if (response != null) { sendRtpItem.setToTag(response.getToTag()); } if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { @@ -719,10 +720,6 @@ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); } } else { SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setRtcp(platform.isRtcp()); sendRtpItem.setTcp(mediaTransmissionTCP); sendRtpItem.setTcpActive(); // 其他平台内容 otherWvpPushStream(sendRtpItem, request, platform); } @@ -733,29 +730,28 @@ */ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { // TODO 控制启用以使设备上线 logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId()); zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); dynamicTask.stop(callIdHeader.getCallId()); pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); dynamicTask.stop(sendRtpItem.getCallId()); sendProxyStream(sendRtpItem, mediaServerItem, platform, request); }); dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream()); dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlatformPlayTimeout()); boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { try { responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline"); responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } zlmHttpHookSubscribe.removeSubscribe(hookSubscribe); dynamicTask.stop(callIdHeader.getCallId()); dynamicTask.stop(sendRtpItem.getCallId()); } } @@ -763,50 +759,28 @@ * 通知流上线 */ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) { if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 try { logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); } return; } // 发送redis消息以使设备上线 logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); // 发送redis消息以使设备上线,流上线后被 logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId()); sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); // 设置超时 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); try { redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); responseAck(request, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } }, userSetting.getPlatformPlayTimeout()); // 写入redis待发流信息,供其他wvp读取并生成发流信息 SendRtpItem sendRtpItemTemp = new SendRtpItem(); sendRtpItemTemp.setIp(addressStr); sendRtpItemTemp.setPort(port); sendRtpItemTemp.setSsrc(ssrc); sendRtpItemTemp.setPlatformId(requesterId); sendRtpItemTemp.setPlatformName(platform.getName()); sendRtpItemTemp.setTcp(mediaTransmissionTCP); sendRtpItemTemp.setRtcp(platform.isRtcp()); sendRtpItemTemp.setTcpActive(tcpActive); sendRtpItemTemp.setPlayType(InviteStreamType.PUSH); redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout()); redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout()); // 添加上线的通知 mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> { dynamicTask.stop(callIdHeader.getCallId()); redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream()); mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> { dynamicTask.stop(sendRtpItem.getCallId()); redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream()); if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { int localPort = sendRtpPortManager.getNextPort(mediaServerItem); @@ -823,18 +797,18 @@ } return; } sendRtpItemTemp.setLocalPort(localPort); sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): ); // 写入redis, 超时时回复 sendRtpItemTemp.setStatus(1); sendRtpItemTemp.setCallId(callIdHeader.getCallId()); sendRtpItemTemp.setFromTag(request.getFromTag()); SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform); if (response != null) { sendRtpItemTemp.setToTag(response.getToTag()); sendRtpItem.setLocalPort(localPort); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sendRtpItem.setLocalIp(platform.getSendStreamIp()); } redisCatchStorage.updateSendRTPSever(sendRtpItemTemp); // 写入redis, 超时时回复 sendRtpItem.setStatus(1); SIPResponse response = sendStreamAck(request, sendRtpItem, platform); if (response != null) { sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { // 其他平台内容 otherWvpPushStream(sendRtpItemFromRedis, request, platform); @@ -842,10 +816,10 @@ }); // 添加回复的拒绝或者错误的通知 redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(callIdHeader.getCallId()); mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); dynamicTask.stop(sendRtpItem.getCallId()); mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); try { responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); } catch (SipException | InvalidArgumentException | ParseException e) { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,14 +18,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -103,7 +101,7 @@ private EventPublisher eventPublisher; @Autowired private ZLMMediaListManager zlmMediaListManager; private RedisPlatformPushStreamOnlineLister zlmMediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; @@ -129,6 +127,9 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; @Autowired private IStreamPushService streamPushService; /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 @@ -236,10 +237,7 @@ // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } } else { zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); } HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(true); @@ -465,8 +463,7 @@ || param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { param.setSeverId(userSetting.getServerId()); zlmMediaListManager.addPush(param); streamPushService.updatePush(param); // 冗余数据,自己系统中自用 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param); } @@ -483,10 +480,13 @@ } } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); // 查找是否关联了国标, 关联了不删除, 置为离线 if (gbStream == null) { storager.removeMedia(param.getApp(), param.getStream()); }else { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); storager.mediaOffline(param.getApp(), param.getStream()); } zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
File was deleted src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -118,4 +118,5 @@ Map<String, StreamPushItem> getAllAppAndStreamMap(); void updatePush(OnStreamChangedHookParam param); } src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -31,7 +31,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; @@ -133,9 +132,6 @@ @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @@ -1366,15 +1362,7 @@ param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); }); } else { if (mediaInfo != null) { // 如果是严格模式,需要关闭端口占用 JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -553,4 +553,21 @@ public Map<String, StreamPushItem> getAllAppAndStreamMap() { return streamPushMapper.getAllAppAndStreamMap(); } @Override public void updatePush(OnStreamChangedHookParam param) { StreamPushItem transform = transform(param); StreamPushItem pushInDb = getPush(param.getApp(), param.getStream()); transform.setPushIng(param.isRegist()); transform.setUpdateTime(DateUtil.getNow()); transform.setPushTime(DateUtil.getNow()); transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); if (pushInDb == null) { transform.setCreateTime(DateUtil.getNow()); streamPushMapper.add(transform); }else { streamPushMapper.update(transform); gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); } } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
New file @@ -0,0 +1,97 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author lin */ @Component public class RedisPlatformPushStreamOnlineLister implements MessageListener { private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister"); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; /** * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调 */ @Override public void onMessage(Message message, byte[] pattern) { boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); sendStreamEvent(sendRtpItem); } }); } } private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>(); public void sendStreamEvent(SendRtpItem sendRtpItem) { // 查看推流状态 ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); if (channelOnlineEventLister != null) { try { channelOnlineEventLister.run(sendRtpItem); } catch (ParseException e) { logger.error("sendStreamEvent: ", e); } removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream()); } } public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { this.channelOnPublishEvents.put(app + "_" + stream, callback); } public void removedChannelOnlineEventLister(String app, String stream) { this.channelOnPublishEvents.remove(app + "_" + stream); } public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { return this.channelOnPublishEvents.get(app + "_" + stream); } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
@@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,8 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -32,10 +38,10 @@ private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; private ZLMServerFactory zlmServerFactory; @Autowired private ZlmHttpHookSubscribe hookSubscribe; private IMediaServerService mediaServerService; @Qualifier("taskExecutor") @Autowired @@ -52,23 +58,14 @@ while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class); if (messageForPushChannel == null || ObjectUtils.isEmpty(messageForPushChannel.getApp()) || ObjectUtils.isEmpty(messageForPushChannel.getStream()) || userSetting.getServerId().equals(messageForPushChannel.getServerId())){ continue; SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class); sendRtpItem.getMediaServerId(); MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServer == null) { return; } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed( messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp", null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { // 读取redis中的上级点播信息,生成sendRtpItm发送出去 }); Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem); sendRtp(sendRtpItem, mediaServer, sendRtpParam); }catch (Exception e) { logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); @@ -78,4 +75,48 @@ }); } } private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) { String isUdp = sendRtpItem.isTcp() ? "0" : "1"; Map<String, Object> param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); param.put("stream",sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); param.put("is_udp", isUdp); if (!sendRtpItem.isTcp()) { // udp模式下开启rtcp保活 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } return param; } private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){ JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); }else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); } }else { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param); }else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param); } } return startSendRtpStreamResult; } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
@@ -2,12 +2,15 @@ import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,13 +38,25 @@ private UserSetting userSetting; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private ZlmHttpHookSubscribe hookSubscribe; @Autowired private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister; @Autowired private SSRCFactory ssrcFactory; @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; /** * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知 */ @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody())); @@ -66,7 +81,17 @@ null); hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { // 读取redis中的上级点播信息,生成sendRtpItm发送出去 SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream()); if (sendRtpItem.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId()); sendRtpItem.setSsrc(ssrc); sendRtpItem.setMediaServerId(mediaServerItemInUse.getId()); sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp()); redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem); // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。 redisCatchStorage.sendPushStreamOnline(sendRtpItem); } }); src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
File was deleted src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -219,5 +219,9 @@ void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout); SendRtpItem getWaiteSendRtpItem(String app, String stream); void sendStartSendRtp(SendRtpItem sendRtpItem); void sendPushStreamOnline(SendRtpItem sendRtpItem); } src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -686,8 +686,21 @@ } @Override public SendRtpItem getWaiteSendRtpItem(String app, String stream) { String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream; return (SendRtpItem)redisTemplate.opsForValue().get(key); } @Override public void sendStartSendRtp(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem)); } @Override public void sendPushStreamOnline(SendRtpItem sendRtpItem) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } }