From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 22 四月 2024 20:29:36 +0800 Subject: [PATCH] 优化notify消息处理 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 163 ++++++++++++++++++++++++++++++----------------------- 1 files changed, 92 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 89f57c2..460a507 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -20,12 +19,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,78 +67,100 @@ private final static String talkKey = "notify-request-for-mobile-position-task"; +// @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - + long startTime = System.currentTimeMillis(); // 鍥炲 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest()); return; } - + Device device = redisCatchStorage.getDevice(deviceId); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + List<Element> elements = rootElement.elements(); + String channelId = null; + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + channelId = element.getStringValue(); + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 鏍规嵁閫氶亾id鏌ヨ璁惧Id + List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (!deviceList.isEmpty()) { + device = deviceList.get(0); + } + } + } + if (device == null) { + logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); + return; + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 + if (deviceId.equals(channelId)) { + List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 鏍规嵁閫氶亾id鏌ヨ璁惧Id - List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - } } } - if (device == null) { - logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId); - return; - } - // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌 - if (deviceId.equals(channelId)) { - List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); // 鏇存柊device channel 鐨勭粡绾害 @@ -149,13 +170,13 @@ deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(channelId, deviceChannel); + updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 300) { + if(updateChannelMap.size() > 100) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 300) { + if(addMobilePositionList.size() > 100) { executeSaveMobilePosition(); } } @@ -170,7 +191,7 @@ // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + dynamicTask.startDelay(talkKey, this::executeSave, 3000); } } catch (DocumentException e) { @@ -186,29 +207,29 @@ dynamicTask.stop(talkKey); } - private void executeSaveChannel(){ + @Async("taskExecutor") + public void executeSaveChannel(){ + dynamicTask.execute(); try { logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", updateChannelMap.size()); - ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); +// ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); +// deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { } } - - private void executeSaveMobilePosition(){ + @Async("taskExecutor") + public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { - logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); +// logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); +// deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); } } } - - } -- Gitblit v1.8.0