From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 22 四月 2024 20:29:36 +0800 Subject: [PATCH] 优化notify消息处理 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 84 ++++++++++++++++-------------------------- 1 files changed, 32 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index d35c6a6..2dd107a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,12 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -28,6 +25,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -38,7 +37,6 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,7 +79,7 @@ private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; @Autowired - private CivilCodeFileConf civilCodeFileConf; + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @@ -100,7 +98,6 @@ @Override public void process(RequestEvent evt) { try { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 寰呭鐞嗘秷鎭槦鍒楀凡婊� {}锛岃繑鍥�486 BUSY_HERE锛屾秷鎭笉鍋氬鐞�", userSetting.getMaxNotifyCountQueue()); @@ -113,10 +110,10 @@ logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { +// logger.warn("寮�濮嬪鐞�"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); @@ -137,8 +134,12 @@ logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); +// logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); +// processNotifyMobilePosition(take.getEvt()); + taskExecutor.execute(() -> { + notifyRequestForMobilePositionProcessor.process(take.getEvt()); + }); + } else { logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } @@ -155,11 +156,11 @@ * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -187,6 +188,13 @@ if (device == null) { logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); return; + } + // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 + if(deviceId.equals(channelId)) { + List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } } if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); @@ -222,7 +230,6 @@ mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -230,30 +237,15 @@ deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); +// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); +// +// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); +// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); +// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); +// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); - } - - storager.updateChannelPosition(deviceChannel); - - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); - jsonObject.put("serial", deviceId); - jsonObject.put("code", channelId); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); } catch (DocumentException e) { logger.error("鏈鐞嗙殑寮傚父 ", e); } @@ -341,25 +333,8 @@ mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); - } - - storager.updateChannelPosition(deviceChannel); - // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); - jsonObject.put("serial", deviceChannel.getDeviceId()); - jsonObject.put("code", deviceChannel.getChannelId()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); - + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } - // TODO: 闇�瑕佸疄鐜板瓨鍌ㄦ姤璀︿俊鎭�佹姤璀﹀垎绫� // 鍥炲200 OK if (redisCatchStorage.deviceIsOnline(deviceId)) { @@ -394,4 +369,9 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆� + public void execute(){ + System.out.println("寰呭鐞嗘秷鎭暟閲�: " + taskQueue.size()); + } } -- Gitblit v1.8.0