From bc38f5ef299f44f65fd34258b84272a027c10cb6 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 27 七月 2022 14:48:21 +0800 Subject: [PATCH] 修复流地址返回错误 --- src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java | 29 ++++++++++++++++++++++++++--- 1 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java index 7482833..d5a26e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -1,15 +1,20 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; /** * 鎺ユ敹鏉ヨ嚜redis鐨凣PS鏇存柊閫氱煡 @@ -20,13 +25,31 @@ private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); + private boolean taskQueueHandlerRun = false; + @Autowired private IRedisCatchStorage redisCatchStorage; + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override public void onMessage(@NotNull Message message, byte[] bytes) { - // TODO 鍔犳秷鎭槦鍒� - GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + taskQueue.offer(message); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + } + taskQueueHandlerRun = false; + }); + } } } -- Gitblit v1.8.0