From d7a1b94f905c5f28c9c8f2d48c3f9e28ebcf9cc4 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期六, 24 九月 2022 21:04:58 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java | 91 +++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 91 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java new file mode 100644 index 0000000..415787e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -0,0 +1,91 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; + +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * 鎺ユ敹鍏朵粬wvp鍙戦�佹祦鍙樺寲閫氱煡 + * @author lin + */ +@Component +public class RedisStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + private boolean taskQueueHandlerRun = false; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); + if (steamMsgJson == null) { + logger.warn("[鏀跺埌redis 娴佸彉鍖朷娑堟伅瑙f瀽澶辫触"); + continue; + } + String serverId = steamMsgJson.getString("serverId"); + + if (userSetting.getServerId().equals(serverId)) { + // 鑷繁鍙戦�佺殑娑堟伅蹇界暐鍗冲彲 + continue; + } + logger.info("[鏀跺埌redis 娴佸彉鍖朷锛� {}", new String(message.getBody())); + String app = steamMsgJson.getString("app"); + String stream = steamMsgJson.getString("stream"); + boolean register = steamMsgJson.getBoolean("register"); + String mediaServerId = steamMsgJson.getString("mediaServerId"); + MediaItem mediaItem = new MediaItem(); + mediaItem.setSeverId(serverId); + mediaItem.setApp(app); + mediaItem.setStream(stream); + mediaItem.setRegist(register); + mediaItem.setMediaServerId(mediaServerId); + mediaItem.setCreateStamp(System.currentTimeMillis()/1000); + mediaItem.setAliveSecond(0L); + mediaItem.setTotalReaderCount("0"); + mediaItem.setOriginType(0); + mediaItem.setOriginTypeStr("0"); + mediaItem.setOriginTypeStr("unknown"); + if (register) { + zlmMediaListManager.addPush(mediaItem); + }else { + zlmMediaListManager.removeMedia(app, stream); + } + } + taskQueueHandlerRun = false; + }); + } + } +} -- Gitblit v1.8.0