From a574ff094428decbdc35332d184cd0d210716a44 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 22 九月 2022 16:56:20 +0800 Subject: [PATCH] 修复使用队列导致的问题 --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java | 8 +++++--- 1 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index 84f5ef7..8d1b066 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -16,6 +16,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -36,19 +37,20 @@ private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override - public void onMessage(Message message, byte[] bytes) { + public void onMessage(@NotNull Message message, byte[] bytes) { logger.info("鏀跺埌鏉ヨ嚜REDIS鐨凙LARM閫氱煡锛� {}", new String(message.getBody())); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; + logger.info("[绾跨▼姹犱俊鎭痌娲诲姩绾跨▼鏁帮細{}, 鏈�澶х嚎绋嬫暟锛� {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize()); taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); @@ -56,7 +58,7 @@ AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); if (alarmChannelMessage == null) { logger.warn("[REDIS鐨凙LARM閫氱煡]娑堟伅瑙f瀽澶辫触"); - return; + continue; } String gbId = alarmChannelMessage.getGbId(); -- Gitblit v1.8.0