From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 22 四月 2024 20:29:36 +0800 Subject: [PATCH] 优化notify消息处理 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 26 +++--- src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 4 src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 163 +++++++++++++++++++++++----------------- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 24 ++++- src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 15 +++ src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 4 7 files changed, 143 insertions(+), 95 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a9f5c88..a9b17ae 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -66,7 +66,7 @@ private List<String> allowedOrigins = new ArrayList<>(); - private int maxNotifyCountQueue = 10000; + private int maxNotifyCountQueue = 100000; private int registerAgainAfterTime = 60; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cde70eb..c80cc88 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -96,10 +96,6 @@ // 閬嶅巻DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { @@ -264,21 +260,12 @@ } } + // TODO 鍚屼竴涓�氶亾濡傛灉鍏堝彂閫佹洿鏂板啀鍙戦�佺绾垮彲鑳芥棤娉曟甯哥绾� private void executeSave(){ try { executeSaveForAdd(); } catch (Exception e) { logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); } try { executeSaveForOnline(); @@ -290,6 +277,17 @@ } catch (Exception e) { logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e ); } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e ); + } + dynamicTask.stop(talkKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 89f57c2..460a507 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -20,12 +19,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,78 +67,100 @@ private final static String talkKey = "notify-request-for-mobile-position-task"; +// @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - + long startTime = System.currentTimeMillis(); // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); return; } - + Device device = redisCatchStorage.getDevice(deviceId); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + List<Element> elements = rootElement.elements(); + String channelId = null; + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + channelId = element.getStringValue(); + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 鏍规嵁閫氶亾id鏌ヨ璁惧Id + List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (!deviceList.isEmpty()) { + device = deviceList.get(0); + } + } + } + if (device == null) { + logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); + return; + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 + if (deviceId.equals(channelId)) { + List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - } } } - if (device == null) { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; - } - // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 - if (deviceId.equals(channelId)) { - List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); // 鏇存柊device channel 鐨勭粡绾害 @@ -149,13 +170,13 @@ deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(channelId, deviceChannel); + updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 300) { + if(updateChannelMap.size() > 100) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 300) { + if(addMobilePositionList.size() > 100) { executeSaveMobilePosition(); } } @@ -170,7 +191,7 @@ // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + dynamicTask.startDelay(talkKey, this::executeSave, 3000); } } catch (DocumentException e) { @@ -186,29 +207,29 @@ dynamicTask.stop(talkKey); } - private void executeSaveChannel(){ + @Async("taskExecutor") + public void executeSaveChannel(){ + dynamicTask.execute(); try { logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", updateChannelMap.size()); - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); +// ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); +// deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { } } - - private void executeSaveMobilePosition(){ + @Async("taskExecutor") + public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { - logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); +// logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); +// deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); } } } - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 84f44b5..2dd107a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,6 +25,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -76,6 +78,9 @@ @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -105,10 +110,10 @@ logger.error("鏈鐞嗙殑寮傚父 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { +// logger.warn("寮�濮嬪鐞�"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); @@ -129,8 +134,12 @@ logger.info("鎺ユ敹鍒癆larm閫氱煡"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); - processNotifyMobilePosition(take.getEvt()); +// logger.info("鎺ユ敹鍒癕obilePosition閫氱煡"); +// processNotifyMobilePosition(take.getEvt()); + taskExecutor.execute(() -> { + notifyRequestForMobilePositionProcessor.process(take.getEvt()); + }); + } else { logger.info("鎺ユ敹鍒版秷鎭細" + cmd); } @@ -147,11 +156,11 @@ * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -360,4 +369,9 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆� + public void execute(){ + System.out.println("寰呭鐞嗘秷鎭暟閲�: " + taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 3760381..f766640 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -356,11 +356,11 @@ @Override public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { - + channelMapper.batchUpdate(channelList); } @Override public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { - + deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 4aa9853..ae22336 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -347,8 +347,8 @@ "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + - "<if test='customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" + - "<if test='custom_latitude != null'>, custom_latitude=#{item.customLatitude}</if>" + + "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" + + "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index 7bf243c..e3d8982 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -33,4 +33,19 @@ @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") int clearMobilePositionsByDeviceId(String deviceId); + + @Insert("<script> " + + "insert into wvp_device_mobile_position " + + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ + "values " + + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + + "#{item.createTime}) " + + "</foreach> " + + "</script>") + void batchadd(List<MobilePosition> mobilePositions); + } -- Gitblit v1.8.0