src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,8 +15,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -92,6 +95,12 @@ @Autowired private UserSetting userSetting; @Autowired private IStreamPushService pushService; @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -124,6 +133,18 @@ param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 查询这路流是否是本平台的 StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); if (push!= null && !push.isSelf()) { // 不是本平台的就发送redis消息让其他wvp停止发流 ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); @@ -131,7 +152,7 @@ if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (platform != null) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, @@ -143,7 +164,17 @@ logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); } } }else { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo != null) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 来自上级平台的停止对讲 @@ -169,6 +200,7 @@ } } } } // 可能是设备发送的停止 SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -579,7 +579,7 @@ } // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( inviteInfo.getChannelId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
New file @@ -0,0 +1,49 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; public class RequestStopPushStreamMsg { private SendRtpItem sendRtpItem; private String platformName; private int platFormIndex; public SendRtpItem getSendRtpItem() { return sendRtpItem; } public void setSendRtpItem(SendRtpItem sendRtpItem) { this.sendRtpItem = sendRtpItem; } public String getPlatformName() { return platformName; } public void setPlatformName(String platformName) { this.platformName = platformName; } public int getPlatFormIndex() { return platFormIndex; } public void setPlatFormIndex(int platFormIndex) { this.platFormIndex = platFormIndex; } public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) { RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg(); streamMsg.setSendRtpItem(sendRtpItem); streamMsg.setPlatformName(platformName); streamMsg.setPlatFormIndex(platFormIndex); return streamMsg; } } src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@ public class WvpRedisMsgCmd { /** * 请求获取推流信息 */ public static final String GET_SEND_ITEM = "GetSendItem"; /** * 请求推流的请求 */ public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; /** * 停止推流的请求 */ public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream"; } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@ case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -397,6 +400,19 @@ redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** * 发送请求推流的消息 */ public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { String key = UUID.randomUUID().toString(); WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { if (platformGbId == null) { platformGbId = "*"; @@ -423,4 +439,36 @@ return null; } } /** * 处理收到的请求推流的请求 */ private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); if (sendRtpItem == null) { logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL"); return; } MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaInfo == null) { // TODO 回复错误 return; } Map<String, Object> param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); param.put("stream",sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 发送redis消息 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } } } src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@ MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); if (push != null) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( push.getGbId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@ */ void sendStreamPushRequestedMsgForStatus(); List<SendRtpItem> querySendRTPServerByChnnelId(String channelId); List<SendRtpItem> querySendRTPServerByChannelId(String channelId); List<SendRtpItem> querySendRTPServerByStream(String stream); src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@ } @Override public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) { public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) { if (channelId == null) { return null; }