From 30ae9e929fad80f624ab632c53081db3d2dc9aec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 25 五月 2023 17:28:57 +0800 Subject: [PATCH] 合并主线 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 76 ++++++++++++++++++++++--------------- 1 files changed, 45 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index a5e27c3..5dae826 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -76,11 +76,16 @@ @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + + private int maxQueueCount = 30000; @Override public void afterPropertiesSet() throws Exception { @@ -91,43 +96,52 @@ @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + } + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); + logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { - try { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("鎺ユ敹鍒癆larm閫氱煡"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("鎺ユ敹鍒版秷鎭細" + cmd); - } - } catch (DocumentException e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; } + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + logger.info("鎺ユ敹鍒癈atalog閫氱煡"); +// processNotifyCatalogList(take.getEvt()); + notifyRequestForCatalogProcessor.process(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("鎺ユ敹鍒癆larm閫氱煡"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("鎺ユ敹鍒版秷鎭細" + cmd); + } + } catch (DocumentException e) { + logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); } - }catch (Exception e) { - logger.error("澶勭悊NOTIFY娑堟伅鏃堕敊璇�", e); } }); } @@ -135,7 +149,7 @@ /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -239,7 +253,7 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -349,7 +363,7 @@ /*** * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * + * * @param evt */ private void processNotifyCatalogList(RequestEvent evt) { -- Gitblit v1.8.0