From f4960b2618f8f19f3bcbdda9683ec051f16949de Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 14 六月 2023 17:04:41 +0800 Subject: [PATCH] 修复移动位置获取接口超时 #857 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 30 ++++++++++++++++++++++++------ 1 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index a0038ca..cb780e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -76,11 +76,16 @@ @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + + private int maxQueueCount = 30000; @Override public void afterPropertiesSet() throws Exception { @@ -91,17 +96,29 @@ @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + } + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); + logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; + } Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); @@ -111,7 +128,8 @@ if (CmdType.CATALOG.equals(cmd)) { logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); +// processNotifyCatalogList(take.getEvt()); + notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); @@ -131,7 +149,7 @@ /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -235,7 +253,7 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -345,7 +363,7 @@ /*** * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * + * * @param evt */ private void processNotifyCatalogList(RequestEvent evt) { @@ -354,7 +372,7 @@ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || device.getOnline() == 0) { + if (device == null || !device.isOnLine()) { logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); return; } -- Gitblit v1.8.0