648540858
2024-03-21 f96250ef45af8d4c8f52fed382da058ab9330bbf
Merge branch 'master' into dev/abl支持
9个文件已修改
1个文件已添加
166 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -16,8 +16,11 @@
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -93,6 +96,12 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 添加消息处理的订阅
@@ -116,7 +125,7 @@
        // 收流端发送的停止
        if (sendRtpItem != null){
            logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType());
            logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
            String streamId = sendRtpItem.getStream();
            Map<String, Object> param = new HashMap<>();
@@ -124,7 +133,19 @@
            param.put("app",sendRtpItem.getApp());
            param.put("stream",streamId);
            param.put("ssrc",sendRtpItem.getSsrc());
            logger.info("[收到bye] 停止推流:{}", streamId);
            logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                // 查询这路流是否是本平台的
                StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
                if (push!= null && !push.isSelf()) {
                    // 不是本平台的就发送redis消息让其他wvp停止发流
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
                    }
                }else {
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                    callIdHeader.getCallId(), null);
@@ -132,7 +153,7 @@
            if (userSetting.getUseCustomSsrcForParentInvite()) {
                mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
            }
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                if (platform != null) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
@@ -144,7 +165,17 @@
                    logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
                }
            }
            }else {
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
                        callIdHeader.getCallId(), null);
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                if (userSetting.getUseCustomSsrcForParentInvite()) {
                    mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
                }
            }
            MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            if (mediaInfo != null) {
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
                // 来自上级平台的停止对讲
@@ -170,6 +201,7 @@
                }
            }
        }
        }
            // 可能是设备发送的停止
            SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -618,9 +618,9 @@
                }
                // 收到无人观看说明流也没有在往上级推送
                if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                            inviteInfo.getChannelId());
                    if (sendRtpItems.size() > 0) {
                    if (!sendRtpItems.isEmpty()) {
                        for (SendRtpItem sendRtpItem : sendRtpItems) {
                            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                            try {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -289,6 +289,10 @@
     * 调用zlm RESTful API —— stopSendRtp
     */
    public Boolean stopSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
        if (mediaServerItem == null) {
            logger.error("[停止RTP推流] 失败: 媒体节点为NULL");
            return false;
        }
        Boolean result = false;
        JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
        if (jsonObject == null) {
src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java
New file
@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
public class RequestStopPushStreamMsg {
    private SendRtpItem sendRtpItem;
    private String platformName;
    private int platFormIndex;
    public SendRtpItem getSendRtpItem() {
        return sendRtpItem;
    }
    public void setSendRtpItem(SendRtpItem sendRtpItem) {
        this.sendRtpItem = sendRtpItem;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public int getPlatFormIndex() {
        return platFormIndex;
    }
    public void setPlatFormIndex(int platFormIndex) {
        this.platFormIndex = platFormIndex;
    }
    public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
        RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
        streamMsg.setSendRtpItem(sendRtpItem);
        streamMsg.setPlatformName(platformName);
        streamMsg.setPlatFormIndex(platFormIndex);
        return streamMsg;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java
@@ -6,7 +6,17 @@
public class WvpRedisMsgCmd {
    /**
     * 请求获取推流信息
     */
    public static final String GET_SEND_ITEM = "GetSendItem";
    /**
     * 请求推流的请求
     */
    public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
    /**
     * 停止推流的请求
     */
    public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -262,6 +262,8 @@
        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
        // 设置最小值为30
        dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
        catalogSubscribeTask.run();
        return true;
    }
@@ -295,6 +297,7 @@
        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
        // 刷新订阅
        dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000);
        mobilePositionSubscribeTask.run();
        return true;
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -133,7 +133,10 @@
                                case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                    RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
                                    RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
                                    requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                    break;
                                default:
                                    break;
@@ -397,6 +400,19 @@
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    /**
     * 发送请求推流的消息
     */
    public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
        String key = UUID.randomUUID().toString();
        WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
        redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
        if (platformGbId == null) {
            platformGbId = "*";
@@ -423,4 +439,36 @@
            return null;
        }
    }
    /**
     * 处理收到的请求推流的请求
     */
    private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
        SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
        if (sendRtpItem == null) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
            return;
        }
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
            logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
            // 发送redis消息
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                    sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
                    sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
            redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -73,7 +73,7 @@
        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
        if (push != null) {
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                    push.getGbId());
            if (!sendRtpItems.isEmpty()) {
                for (SendRtpItem sendRtpItem : sendRtpItems) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -181,7 +181,7 @@
     */
    void sendStreamPushRequestedMsgForStatus();
    List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
    List<SendRtpItem> querySendRTPServerByChannelId(String channelId);
    List<SendRtpItem> querySendRTPServerByStream(String stream);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -184,7 +184,7 @@
    }
    @Override
    public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
    public List<SendRtpItem> querySendRTPServerByChannelId(String channelId) {
        if (channelId == null) {
            return null;
        }