From d4f6ec39b7e0421757a6b9d1a68b1c4610ea2e8c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 14 三月 2024 14:54:20 +0800 Subject: [PATCH] 优化CivilCode缓存 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 167 +++++++++++++++++-------------------------------------- 1 files changed, 52 insertions(+), 115 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java old mode 100644 new mode 100755 index 7366f30..435f35f --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,11 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -37,7 +37,6 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -76,11 +75,19 @@ @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + + @Autowired + private CivilCodeFileConf civilCodeFileConf; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + + private int maxQueueCount = 30000; @Override public void afterPropertiesSet() throws Exception { @@ -91,17 +98,29 @@ @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + } + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); + logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; + } Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("澶勭悊NOTIFY娑堟伅鏃舵湭鑾峰彇鍒版秷鎭綋,{}", take.getEvt().getRequest()); @@ -111,7 +130,7 @@ if (CmdType.CATALOG.equals(cmd)) { logger.info("鎺ユ敹鍒癈atalog閫氱煡"); - processNotifyCatalogList(take.getEvt()); + notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); @@ -131,7 +150,7 @@ /** * 澶勭悊MobilePosition绉诲姩浣嶇疆Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -148,29 +167,38 @@ MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - }else { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 鏍规嵁閫氶亾id鏌ヨ璁惧Id + List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (deviceList.size() > 0) { + device = deviceList.get(0); + } } } - if (device != null) { - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } + if (device == null) { + logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); + return; } - mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + + mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); String time = XmlUtil.getText(rootElement, "Time"); - mobilePosition.setTime(time); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { @@ -192,7 +220,6 @@ mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -212,10 +239,12 @@ } storager.updateChannelPosition(deviceChannel); + // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", time); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceId); jsonObject.put("code", channelId); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -231,7 +260,7 @@ /*** * 澶勭悊alarm璁惧鎶ヨNotify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -288,6 +317,7 @@ logger.info("[鏀跺埌Notify-Alarm]锛歿}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); @@ -317,7 +347,7 @@ storager.updateChannelPosition(deviceChannel); // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -333,99 +363,6 @@ // 鍥炲200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("鏈鐞嗙殑寮傚父 ", e); - } - } - - /*** - * 澶勭悊catalog璁惧鐩綍鍒楄〃Notify - * - * @param evt - */ - private void processNotifyCatalogList(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || device.getOnline() == 0) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸璁惧宸茬粡绂荤嚎", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 鏀跺埌鐩綍璁㈤槄 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 閬嶅巻DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[鏀跺埌鐩綍璁㈤槄]锛歿}, 浣嗘槸Event涓虹┖, 璁句负榛樿鍊� ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); - channel.setDeviceId(device.getDeviceId()); - logger.info("[鏀跺埌鐩綍璁㈤槄]锛歿}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 涓婄嚎 - logger.info("[鏀跺埌閫氶亾涓婄嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOnline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.OFF : - // 绂荤嚎 - logger.info("[鏀跺埌閫氶亾绂荤嚎閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.VLOST: - // 瑙嗛涓㈠け - logger.info("[鏀跺埌閫氶亾瑙嗛涓㈠け閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.DEFECT: - // 鏁呴殰 - break; - case CatalogEvent.ADD: - // 澧炲姞 - logger.info("[鏀跺埌澧炲姞閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - case CatalogEvent.DEL: - // 鍒犻櫎 - logger.info("[鏀跺埌鍒犻櫎閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - storager.delChannel(deviceId, channel.getChannelId()); - break; - case CatalogEvent.UPDATE: - // 鏇存柊 - logger.info("[鏀跺埌鏇存柊閫氶亾閫氱煡] 鏉ヨ嚜璁惧: {}, 閫氶亾 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - default: - logger.warn("[ NotifyCatalog ] event not found 锛� {}", event ); - - } - // 杞彂鍙樺寲淇℃伅 - eventPublisher.catalogEventPublish(null, channel, event); - - } } } catch (DocumentException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); -- Gitblit v1.8.0