648540858
2024-03-21 3291c4b2e67d510186ca5fbfac8ec5af1a9d4f16
修复多平台推流无人观看redis通知
7个文件已修改
1个文件已添加
233 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,8 +15,11 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -92,6 +95,12 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -124,58 +133,81 @@
            param.put("stream",streamId);
            param.put("ssrc",sendRtpItem.getSsrc());
            logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                    callIdHeader.getCallId(), null);
            zlmServerFactory.stopSendRtpStream(mediaInfo, param);
            if (userSetting.getUseCustomSsrcForParentInvite()) {
                mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
            }
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                if (platform != null) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                            sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                    messageForPushChannel.setPlatFormIndex(platform.getId());
                    redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                // 查询这路流是否是本平台的
                StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
                if (push!= null && !push.isSelf()) {
                    // 不是本平台的就发送redis消息让其他wvp停止发流
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
                    }
                }else {
                    logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                            callIdHeader.getCallId(), null);
                    zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                    if (userSetting.getUseCustomSsrcForParentInvite()) {
                        mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                    }
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                                sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                        messageForPushChannel.setPlatFormIndex(platform.getId());
                        redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                    }else {
                        logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                    }
                }
            }else {
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                        callIdHeader.getCallId(), null);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                if (userSetting.getUseCustomSsrcForParentInvite()) {
                    mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                }
            }
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            if (mediaInfo != null) {
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
                    // 来自上级平台的停止对讲
                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                }
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
                // 来自上级平台的停止对讲
                logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            }
            int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
            if (totalReaderCount <= 0) {
                logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
                if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                    Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                    if (device == null) {
                        logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
                    }
                    try {
                        logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
                    } catch (InvalidArgumentException | ParseException | SipException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
                int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
                if (totalReaderCount <= 0) {
                    logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
                    if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
                        Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                        if (device == null) {
                            logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
                        }
                        try {
                            logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
                        } catch (InvalidArgumentException | ParseException | SipException |
                                 SsrcTransactionNotFoundException e) {
                            logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
                        }
                    }
                }
            }
        }
            // 可能是设备发送的停止
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
            if (ssrcTransaction == null) {
                return;
            }
            logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
        // 可能是设备发送的停止
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
        if (ssrcTransaction == null) {
            return;
        }
        logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
        ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId());
        if (platform != null ) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -579,7 +579,7 @@
                }
                // 收到无人观看说明流也没有在往上级推送
                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                            inviteInfo.getChannelId());
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
New file
@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
public class RequestStopPushStreamMsg {
    private SendRtpItem sendRtpItem;
    private String platformName;
    private int platFormIndex;
    public SendRtpItem getSendRtpItem() {
        return sendRtpItem;
    }
    public void setSendRtpItem(SendRtpItem sendRtpItem) {
        this.sendRtpItem = sendRtpItem;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public int getPlatFormIndex() {
        return platFormIndex;
    }
    public void setPlatFormIndex(int platFormIndex) {
        this.platFormIndex = platFormIndex;
    }
    public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
        RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
        streamMsg.setSendRtpItem(sendRtpItem);
        streamMsg.setPlatformName(platformName);
        streamMsg.setPlatFormIndex(platFormIndex);
        return streamMsg;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@
public class WvpRedisMsgCmd {
    /**
     * 请求获取推流信息
     */
    public static final String GET_SEND_ITEM = "GetSendItem";
    /**
     * 请求推流的请求
     */
    public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
    /**
     * 停止推流的请求
     */
    public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
                                    RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                default:
                                    break;
@@ -397,6 +400,19 @@
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 发送请求推流的消息
     */
    public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
@@ -423,4 +439,36 @@
            return null;
        }
    }
    /**
     * 处理收到的请求推流的请求
     */
    private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
        SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
        if (sendRtpItem == null) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
            return;
        }
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
            // 发送redis消息
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                    sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@
        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
        if (push != null) {
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                    push.getGbId());
            if (!sendRtpItems.isEmpty()) {
                for (SendRtpItem sendRtpItem : sendRtpItems) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@
     */
    void sendStreamPushRequestedMsgForStatus();
    List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
    List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
    List<SendRtpItem> querySendRTPServerByStream(String stream);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@
    }
    @Override
    public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
    public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
        if (channelId == null) {
            return null;
        }