src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -16,8 +16,11 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -93,6 +96,12 @@ @Autowired private UserSetting userSetting; @Autowired private IStreamPushService pushService; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -116,7 +125,7 @@ // 收流端发送的停止 if (sendRtpItem != null){ logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType()); logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId()); String streamId = sendRtpItem.getStream(); Map<String, Object> param = new HashMap<>(); @@ -124,7 +133,19 @@ param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止推流:{}", streamId); logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 查询这路流是否是本平台的 StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); if (push!= null && !push.isSelf()) { // 不是本平台的就发送redis消息让其他wvp停止发流 ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); @@ -132,7 +153,7 @@ if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, @@ -144,7 +165,17 @@ logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); } } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo != null) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 来自上级平台的停止对讲 @@ -170,6 +201,7 @@ } } } } // 可能是设备发送的停止 SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -618,9 +618,9 @@ } // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( inviteInfo.getChannelId()); if (sendRtpItems.size() > 0) { if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); try { src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -289,6 +289,10 @@ * 调用zlm RESTful API —— stopSendRtp */ public Boolean stopSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { if (mediaServerItem == null) { logger.error("[停止RTP推流] 失败: 媒体节点为NULL"); return false; } Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param); if (jsonObject == null) { src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
New file @@ -0,0 +1,49 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; public class RequestStopPushStreamMsg { private SendRtpItem sendRtpItem; private String platformName; private int platFormIndex; public SendRtpItem getSendRtpItem() { return sendRtpItem; } public void setSendRtpItem(SendRtpItem sendRtpItem) { this.sendRtpItem = sendRtpItem; } public String getPlatformName() { return platformName; } public void setPlatformName(String platformName) { this.platformName = platformName; } public int getPlatFormIndex() { return platFormIndex; } public void setPlatFormIndex(int platFormIndex) { this.platFormIndex = platFormIndex; } public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) { RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg(); streamMsg.setSendRtpItem(sendRtpItem); streamMsg.setPlatformName(platformName); streamMsg.setPlatFormIndex(platFormIndex); return streamMsg; } } src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@ public class WvpRedisMsgCmd { /** * 请求获取推流信息 */ public static final String GET_SEND_ITEM = "GetSendItem"; /** * 请求推流的请求 */ public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; /** * 停止推流的请求 */ public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream"; } src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -262,6 +262,8 @@ int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 设置最小值为30 dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000); catalogSubscribeTask.run(); return true; } @@ -295,6 +297,7 @@ int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); // 刷新订阅 dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000); mobilePositionSubscribeTask.run(); return true; } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@ case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -397,6 +400,19 @@ redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 发送请求推流的消息 */ public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { if (platformGbId == null) { platformGbId = "*"; @@ -423,4 +439,36 @@ return null; } } /** * 处理收到的请求推流的请求 */ private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); if (sendRtpItem == null) { logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL"); return; } MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo == null) { // TODO 回复错误 return; } Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); param.put("stream",sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 发送redis消息 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@ MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); if (push != null) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( push.getGbId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@ */ void sendStreamPushRequestedMsgForStatus(); List<SendRtpItem> querySendRTPServerByChnnelId(String channelId); List<SendRtpItem> querySendRTPServerByChannelId(String channelId); List<SendRtpItem> querySendRTPServerByStream(String stream); src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@ } @Override public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) { public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) { if (channelId == null) { return null; }