648540858
2024-03-21 3291c4b2e67d510186ca5fbfac8ec5af1a9d4f16
修复多平台推流无人观看redis通知
7个文件已修改
1个文件已添加
153 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -15,8 +15,11 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -92,6 +95,12 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -124,6 +133,18 @@
            param.put("stream",streamId);
            param.put("ssrc",sendRtpItem.getSsrc());
            logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                // 查询这路流是否是本平台的
                StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
                if (push!= null && !push.isSelf()) {
                    // 不是本平台的就发送redis消息让其他wvp停止发流
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
                    }
                }else {
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                    callIdHeader.getCallId(), null);
@@ -131,7 +152,7 @@
            if (userSetting.getUseCustomSsrcForParentInvite()) {
                mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
            }
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                if (platform != null) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
@@ -143,7 +164,17 @@
                    logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                }
            }
            }else {
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                        callIdHeader.getCallId(), null);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                if (userSetting.getUseCustomSsrcForParentInvite()) {
                    mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                }
            }
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            if (mediaInfo != null) {
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
                // 来自上级平台的停止对讲
@@ -169,6 +200,7 @@
                }
            }
        }
        }
            // 可能是设备发送的停止
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -579,7 +579,7 @@
                }
                // 收到无人观看说明流也没有在往上级推送
                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                            inviteInfo.getChannelId());
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
New file
@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
public class RequestStopPushStreamMsg {
    private SendRtpItem sendRtpItem;
    private String platformName;
    private int platFormIndex;
    public SendRtpItem getSendRtpItem() {
        return sendRtpItem;
    }
    public void setSendRtpItem(SendRtpItem sendRtpItem) {
        this.sendRtpItem = sendRtpItem;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public int getPlatFormIndex() {
        return platFormIndex;
    }
    public void setPlatFormIndex(int platFormIndex) {
        this.platFormIndex = platFormIndex;
    }
    public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
        RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
        streamMsg.setSendRtpItem(sendRtpItem);
        streamMsg.setPlatformName(platformName);
        streamMsg.setPlatFormIndex(platFormIndex);
        return streamMsg;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@
public class WvpRedisMsgCmd {
    /**
     * 请求获取推流信息
     */
    public static final String GET_SEND_ITEM = "GetSendItem";
    /**
     * 请求推流的请求
     */
    public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
    /**
     * 停止推流的请求
     */
    public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
                                    RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                default:
                                    break;
@@ -397,6 +400,19 @@
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 发送请求推流的消息
     */
    public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
@@ -423,4 +439,36 @@
            return null;
        }
    }
    /**
     * 处理收到的请求推流的请求
     */
    private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
        SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
        if (sendRtpItem == null) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
            return;
        }
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
            // 发送redis消息
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                    sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@
        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
        if (push != null) {
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                    push.getGbId());
            if (!sendRtpItems.isEmpty()) {
                for (SendRtpItem sendRtpItem : sendRtpItems) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@
     */
    void sendStreamPushRequestedMsgForStatus();
    List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
    List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
    List<SendRtpItem> querySendRTPServerByStream(String stream);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@
    }
    @Override
    public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
    public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
        if (channelId == null) {
            return null;
        }