From 652489b47ef582b3f492039ab7fd58c558c1ba36 Mon Sep 17 00:00:00 2001 From: ‘sxh’ <1632740646@qq.com> Date: 星期四, 15 六月 2023 11:10:20 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/wvp-28181-2.0' into wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 40 +++++++++++++++++++++++++++++++--------- 1 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 4260641..e97b720 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -76,11 +76,16 @@ @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + + private int maxQueueCount = 30000; @Override public void afterPropertiesSet() throws Exception { @@ -91,7 +96,15 @@ @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + } + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } @@ -103,6 +116,9 @@ while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; + } Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); @@ -112,7 +128,8 @@ if (CmdType.CATALOG.equals(cmd)) { logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); +// processNotifyCatalogList(take.getEvt()); + notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); @@ -132,7 +149,7 @@ /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -175,7 +192,12 @@ mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); String time = XmlUtil.getText(rootElement, "Time"); - mobilePosition.setTime(time); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { @@ -220,7 +242,7 @@ // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", time); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceId); jsonObject.put("code", channelId); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -236,7 +258,7 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -322,7 +344,7 @@ storager.updateChannelPosition(deviceChannel); // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -346,7 +368,7 @@ /*** * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * + * * @param evt */ private void processNotifyCatalogList(RequestEvent evt) { @@ -355,7 +377,7 @@ String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || device.getOnline() == 0) { + if (device == null || !device.isOnLine()) { logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); return; } -- Gitblit v1.8.0