From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 22 四月 2024 20:29:36 +0800 Subject: [PATCH] 优化notify消息处理 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 24 +++++++++++++++++++----- 1 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 84f44b5..2dd107a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,6 +25,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -76,6 +78,9 @@ @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -105,10 +110,10 @@ logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { +// logger.warn("寮�濮嬪鐞�"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); @@ -129,8 +134,12 @@ logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); +// logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); +// processNotifyMobilePosition(take.getEvt()); + taskExecutor.execute(() -> { + notifyRequestForMobilePositionProcessor.process(take.getEvt()); + }); + } else { logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } @@ -147,11 +156,11 @@ * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -360,4 +369,9 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆� + public void execute(){ + System.out.println("寰呭鐞嗘秷鎭暟閲�: " + taskQueue.size()); + } } -- Gitblit v1.8.0