648540858
2023-06-07 534be3f5809f430cb46cb0fcbba99d3d425f2324
支持redis消息强制关闭流
3个文件已修改
1个文件已添加
69 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -107,6 +107,11 @@
    public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
    /**
     * redis 通知平台关闭推流
     */
    public static final String VM_MSG_STREAM_PUSH_CLOSE = "VM_MSG_STREAM_PUSH_CLOSE";
    /**
     * redis 消息请求所有的在线通道
     */
    public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED";
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -43,6 +43,9 @@
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
    /**
     * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -63,6 +66,7 @@
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -183,6 +183,7 @@
    @Override
    public boolean stop(String app, String streamId) {
        logger.info("[推流 ] 停止流: {}/{}", app, streamId);
        StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
        if (streamPushItem != null) {
            gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java
New file
@@ -0,0 +1,59 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.service.IStreamPushService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 接收来自redis的关闭流更新通知
 * @author lin
 */
@Component
public class RedisCloseStreamMsgListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class);
    @Autowired
    private IStreamPushService pushService;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        JSONObject jsonObject = JSON.parseObject(msg.getBody());
                        String app = jsonObject.getString("app");
                        String stream = jsonObject.getString("stream");
                        pushService.stop(app, stream);
                    }catch (Exception e) {
                        logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS的关闭推流通知] 异常内容: ", e);
                    }
                }
            });
        }
    }
}