From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 22 四月 2024 20:29:36 +0800
Subject: [PATCH] 优化notify消息处理
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java | 26 +++---
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 4
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java | 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 163 +++++++++++++++++++++++-----------------
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java | 24 ++++-
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 15 +++
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 4
7 files changed, 143 insertions(+), 95 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index a9f5c88..a9b17ae 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
private List<String> allowedOrigins = new ArrayList<>();
- private int maxNotifyCountQueue = 10000;
+ private int maxNotifyCountQueue = 100000;
private int registerAgainAfterTime = 60;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
index cde70eb..c80cc88 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -96,10 +96,6 @@
// 閬嶅巻DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
- Element channelDeviceElement = itemDevice.element("DeviceID");
- if (channelDeviceElement == null) {
- continue;
- }
Element eventElement = itemDevice.element("Event");
String event;
if (eventElement == null) {
@@ -264,21 +260,12 @@
}
}
+ // TODO 鍚屼竴涓�氶亾濡傛灉鍏堝彂閫佹洿鏂板啀鍙戦�佺绾垮彲鑳芥棤娉曟甯哥绾�
private void executeSave(){
try {
executeSaveForAdd();
} catch (Exception e) {
logger.error("[瀛樺偍鏀跺埌鐨勫鍔犻�氶亾] 寮傚父锛� ", e );
- }
- try {
- executeSaveForUpdate();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e );
- }
- try {
- executeSaveForDelete();
- } catch (Exception e) {
- logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e );
}
try {
executeSaveForOnline();
@@ -290,6 +277,17 @@
} catch (Exception e) {
logger.error("[瀛樺偍鏀跺埌鐨勯�氶亾绂荤嚎] 寮傚父锛� ", e );
}
+ try {
+ executeSaveForUpdate();
+ } catch (Exception e) {
+ logger.error("[瀛樺偍鏀跺埌鐨勬洿鏂伴�氶亾] 寮傚父锛� ", e );
+ }
+ try {
+ executeSaveForDelete();
+ } catch (Exception e) {
+ logger.error("[瀛樺偍鏀跺埌鐨勫垹闄ら�氶亾] 寮傚父锛� ", e );
+ }
+
dynamicTask.stop(talkKey);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
index 89f57c2..460a507 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,7 +11,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -20,12 +19,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,78 +67,100 @@
private final static String talkKey = "notify-request-for-mobile-position-task";
+// @Async("taskExecutor")
public void process(RequestEvent evt) {
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
+ long startTime = System.currentTimeMillis();
// 鍥炲 200 OK
Element rootElement = getRootElement(evt);
if (rootElement == null) {
logger.error("澶勭悊MobilePosition绉诲姩浣嶇疆Notify鏃舵湭鑾峰彇鍒版秷鎭綋,{}", evt.getRequest());
return;
}
-
+ Device device = redisCatchStorage.getDevice(deviceId);
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
+ List<Element> elements = rootElement.elements();
+ String channelId = null;
+ for (Element element : elements) {
+ switch (element.getName()){
+ case "DeviceID":
+ channelId = element.getStringValue();
+ if (device == null) {
+ device = redisCatchStorage.getDevice(channelId);
+ if (device == null) {
+ // 鏍规嵁閫氶亾id鏌ヨ璁惧Id
+ List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
+ if (!deviceList.isEmpty()) {
+ device = deviceList.get(0);
+ }
+ }
+ }
+ if (device == null) {
+ logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId);
+ return;
+ }
+ mobilePosition.setDeviceId(device.getDeviceId());
+ mobilePosition.setChannelId(channelId);
+ // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌
+ if (deviceId.equals(channelId)) {
+ List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
+ if (deviceChannels.size() == 1) {
+ channelId = deviceChannels.get(0).getChannelId();
+ }
+ }
+ if (!ObjectUtils.isEmpty(device.getName())) {
+ mobilePosition.setDeviceName(device.getName());
+ }
+ mobilePosition.setDeviceId(device.getDeviceId());
+ mobilePosition.setChannelId(channelId);
+ continue;
+ case "Time":
+ String timeVal = element.getStringValue();
+ if (ObjectUtils.isEmpty(timeVal)) {
+ mobilePosition.setTime(DateUtil.getNow());
+ } else {
+ mobilePosition.setTime(SipUtils.parseTime(timeVal));
+ }
+ continue;
+ case "Longitude":
+ mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
+ continue;
+ case "Latitude":
+ mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
+ continue;
+ case "Speed":
+ String speedVal = element.getStringValue();
+ if (NumericUtil.isDouble(speedVal)) {
+ mobilePosition.setSpeed(Double.parseDouble(speedVal));
+ } else {
+ mobilePosition.setSpeed(0.0);
+ }
+ continue;
+ case "Direction":
+ String directionVal = element.getStringValue();
+ if (NumericUtil.isDouble(directionVal)) {
+ mobilePosition.setDirection(Double.parseDouble(directionVal));
+ } else {
+ mobilePosition.setDirection(0.0);
+ }
+ continue;
+ case "Altitude":
+ String altitudeVal = element.getStringValue();
+ if (NumericUtil.isDouble(altitudeVal)) {
+ mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
+ } else {
+ mobilePosition.setAltitude(0.0);
+ }
+ continue;
- Element deviceIdElement = rootElement.element("DeviceID");
- String channelId = deviceIdElement.getTextTrim().toString();
- Device device = redisCatchStorage.getDevice(deviceId);
-
- if (device == null) {
- device = redisCatchStorage.getDevice(channelId);
- if (device == null) {
- // 鏍规嵁閫氶亾id鏌ヨ璁惧Id
- List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
- if (deviceList.size() > 0) {
- device = deviceList.get(0);
- }
}
}
- if (device == null) {
- logger.warn("[mobilePosition绉诲姩浣嶇疆Notify] 鏈壘鍒伴�氶亾{}鎵�灞炵殑璁惧", channelId);
- return;
- }
- // 鍏煎璁惧閮ㄥ垎璁惧涓婃姤鏄�氶亾缂栧彿涓庤澶囩紪鍙蜂竴鑷寸殑鎯呭喌
- if (deviceId.equals(channelId)) {
- List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
- if (deviceChannels.size() == 1) {
- channelId = deviceChannels.get(0).getChannelId();
- }
- }
- if (!ObjectUtils.isEmpty(device.getName())) {
- mobilePosition.setDeviceName(device.getName());
- }
- mobilePosition.setDeviceId(device.getDeviceId());
- mobilePosition.setChannelId(channelId);
- String time = XmlUtil.getText(rootElement, "Time");
- if (ObjectUtils.isEmpty(time)) {
- mobilePosition.setTime(DateUtil.getNow());
- } else {
- mobilePosition.setTime(SipUtils.parseTime(time));
- }
-
- mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
- mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
- if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
- mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
- } else {
- mobilePosition.setSpeed(0.0);
- }
- if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
- mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
- } else {
- mobilePosition.setDirection(0.0);
- }
- if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
- mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
- } else {
- mobilePosition.setAltitude(0.0);
- }
- logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
- mobilePosition.getLongitude(), mobilePosition.getLatitude());
+// logger.info("[鏀跺埌绉诲姩浣嶇疆璁㈤槄閫氱煡]锛歿}/{}->{}.{}, 鏃堕棿锛� {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position");
// 鏇存柊device channel 鐨勭粡绾害
@@ -149,13 +170,13 @@
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
- updateChannelMap.put(channelId, deviceChannel);
+ updateChannelMap.put(deviceId + channelId, deviceChannel);
addMobilePositionList.add(mobilePosition);
- if(updateChannelMap.size() > 300) {
+ if(updateChannelMap.size() > 100) {
executeSaveChannel();
}
if (userSetting.isSavePositionHistory()) {
- if(addMobilePositionList.size() > 300) {
+ if(addMobilePositionList.size() > 100) {
executeSaveMobilePosition();
}
}
@@ -170,7 +191,7 @@
// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
if (!dynamicTask.contains(talkKey)) {
- dynamicTask.startDelay(talkKey, this::executeSave, 1000);
+ dynamicTask.startDelay(talkKey, this::executeSave, 3000);
}
} catch (DocumentException e) {
@@ -186,29 +207,29 @@
dynamicTask.stop(talkKey);
}
- private void executeSaveChannel(){
+ @Async("taskExecutor")
+ public void executeSaveChannel(){
+ dynamicTask.execute();
try {
logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", updateChannelMap.size());
- ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
- deviceChannelService.batchUpdateChannelGPS(deviceChannels);
+// ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
+// deviceChannelService.batchUpdateChannelGPS(deviceChannels);
updateChannelMap.clear();
}catch (Exception e) {
}
}
-
- private void executeSaveMobilePosition(){
+ @Async("taskExecutor")
+ public void executeSaveMobilePosition(){
if (userSetting.isSavePositionHistory()) {
try {
- logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
- deviceChannelService.batchAddMobilePosition(addMobilePositionList);
+// logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
+// deviceChannelService.batchAddMobilePosition(addMobilePositionList);
addMobilePositionList.clear();
}catch (Exception e) {
logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size());
}
}
}
-
-
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
index 84f44b5..2dd107a 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -76,6 +78,9 @@
@Autowired
private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
+ @Autowired
+ private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
+
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@@ -105,10 +110,10 @@
logger.error("鏈鐞嗙殑寮傚父 ", e);
}
boolean runed = !taskQueue.isEmpty();
- logger.info("[notify] 寰呭鐞嗘秷鎭暟閲忥細 {}", taskQueue.size());
taskQueue.offer(new HandlerCatchData(evt, null, null));
if (!runed) {
taskExecutor.execute(()-> {
+// logger.warn("寮�濮嬪鐞�");
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
@@ -129,8 +134,12 @@
logger.info("鎺ユ敹鍒癆larm閫氱煡");
processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
- logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
- processNotifyMobilePosition(take.getEvt());
+// logger.info("鎺ユ敹鍒癕obilePosition閫氱煡");
+// processNotifyMobilePosition(take.getEvt());
+ taskExecutor.execute(() -> {
+ notifyRequestForMobilePositionProcessor.process(take.getEvt());
+ });
+
} else {
logger.info("鎺ユ敹鍒版秷鎭細" + cmd);
}
@@ -147,11 +156,11 @@
*
* @param evt
*/
- private void processNotifyMobilePosition(RequestEvent evt) {
+ @Async("taskExecutor")
+ public void processNotifyMobilePosition(RequestEvent evt) {
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
-
// 鍥炲 200 OK
Element rootElement = getRootElement(evt);
if (rootElement == null) {
@@ -360,4 +369,9 @@
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
this.redisCatchStorage = redisCatchStorage;
}
+
+ @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆�
+ public void execute(){
+ System.out.println("寰呭鐞嗘秷鎭暟閲�: " + taskQueue.size());
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
index 3760381..f766640 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -356,11 +356,11 @@
@Override
public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
-
+ channelMapper.batchUpdate(channelList);
}
@Override
public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
-
+ deviceMobilePositionMapper.batchadd(mobilePositions);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
index 4aa9853..ae22336 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -347,8 +347,8 @@
"<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
"<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
"<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
- "<if test='customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
- "<if test='custom_latitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
+ "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
+ "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
"<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
"<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
"<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
index 7bf243c..e3d8982 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -33,4 +33,19 @@
@Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
int clearMobilePositionsByDeviceId(String deviceId);
+
+ @Insert("<script> " +
+ "insert into wvp_device_mobile_position " +
+ "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
+ "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
+ "values " +
+ "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+ "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
+ "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
+ "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
+ "#{item.createTime}) " +
+ "</foreach> " +
+ "</script>")
+ void batchadd(List<MobilePosition> mobilePositions);
+
}
--
Gitblit v1.8.0