From 8b0ff3767b23f9479a0b3ebe8e343ed6470bd194 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 09 八月 2022 14:41:21 +0800 Subject: [PATCH] Merge branch 'wvp-28181-2.0' --- src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 115 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java new file mode 100644 index 0000000..50e894a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java @@ -0,0 +1,115 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * 鎺ユ敹redis鍙戦�佺殑鎺ㄦ祦璁惧涓婄嚎涓嬬嚎閫氱煡 + * @author lin + */ +@Component +public class RedisPushStreamStatusMsgListener implements MessageListener, ApplicationRunner { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); + + private boolean taskQueueHandlerRun = false; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IStreamPushService streamPushService; + + @Autowired + private DynamicTask dynamicTask; + + + + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] bytes) { + // TODO 澧炲姞闃熷垪 + logger.warn("[REDIS娑堟伅-鎺ㄦ祦璁惧鐘舵�佸彉鍖朷锛� {}", new String(message.getBody())); + taskQueue.offer(message); + + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); + if (statusChangeFromPushStream == null) { + logger.warn("[REDIS娑堟伅]鎺ㄦ祦璁惧鐘舵�佸彉鍖栨秷鎭В鏋愬け璐�"); + return; + } + // 鍙栨秷瀹氭椂浠诲姟 + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); + if (statusChangeFromPushStream.isSetAllOffline()) { + // 鎵�鏈夎澶囩绾� + streamPushService.allStreamOffline(); + } + if (statusChangeFromPushStream.getOfflineStreams() != null + && statusChangeFromPushStream.getOfflineStreams().size() > 0) { + // 鏇存柊閮ㄥ垎璁惧绂荤嚎 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); + } + if (statusChangeFromPushStream.getOnlineStreams() != null && + statusChangeFromPushStream.getOnlineStreams().size() > 0) { + // 鏇存柊閮ㄥ垎璁惧涓婄嚎 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + } + } + taskQueueHandlerRun = false; + }); + } + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // 鍚姩鏃惰缃墍鏈夋帹娴侀�氶亾绂荤嚎锛屽彂璧锋煡璇㈣姹� + redisCatchStorage.sendStreamPushRequestedMsgForStatus(); + dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ + logger.info("[REDIS娑堟伅]鏈敹鍒皉edis鍥炲鎺ㄦ祦璁惧鐘舵�侊紝鎵ц鎺ㄦ祦璁惧绂荤嚎"); + // 浜旂鏀朵笉鍒拌姹傚氨璁剧疆閫氶亾绂荤嚎锛岀劧鍚庨�氱煡涓婄骇绂荤嚎 + streamPushService.allStreamOffline(); + }, 5000); + } + +} -- Gitblit v1.8.0