src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -107,6 +107,11 @@ public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE"; /** * redis 通知平台关闭推流 */ public static final String VM_MSG_STREAM_PUSH_CLOSE = "VM_MSG_STREAM_PUSH_CLOSE"; /** * redis 消息请求所有的在线通道 */ public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED"; src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -43,6 +43,9 @@ @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -63,6 +66,7 @@ container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); return container; } } src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -183,6 +183,7 @@ @Override public boolean stop(String app, String streamId) { logger.info("[推流 ] 停止流: {}/{}", app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); if (streamPushItem != null) { gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java
New file @@ -0,0 +1,59 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.service.IStreamPushService; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收来自redis的关闭流更新通知 * @author lin */ @Component public class RedisCloseStreamMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class); @Autowired private IStreamPushService pushService; private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override public void onMessage(@NotNull Message message, byte[] bytes) { boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { JSONObject jsonObject = JSON.parseObject(msg.getBody()); String app = jsonObject.getString("app"); String stream = jsonObject.getString("stream"); pushService.stop(app, stream); }catch (Exception e) { logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); logger.error("[REDIS的关闭推流通知] 异常内容: ", e); } } }); } } }