From 5564cfb384db16db972e8cb91ca55cf345cfb6ea Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 13 五月 2024 17:22:36 +0800
Subject: [PATCH] 优化大量notify 移动位置订阅的入库

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java                                              |   17 ++++
 src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java                                              |   13 +++
 src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java                                      |   95 +++++++++++++++++++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java |   37 +--------
 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java                                     |    4 
 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java                                            |   17 ----
 6 files changed, 132 insertions(+), 51 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
index df3345e..2dc66b3 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.conf.redis;
 
 import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -25,4 +26,20 @@
         redisTemplate.setConnectionFactory(redisConnectionFactory);
         return redisTemplate;
     }
+
+    @Bean
+    public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) {
+        RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>();
+        // 浣跨敤fastJson搴忓垪鍖�
+        GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
+        // value鍊肩殑搴忓垪鍖栭噰鐢╢astJsonRedisSerializer
+        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
+        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
+
+        // key鐨勫簭鍒楀寲閲囩敤StringRedisSerializer
+        redisTemplate.setKeySerializer(new StringRedisSerializer());
+        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
+        redisTemplate.setConnectionFactory(redisConnectionFactory);
+        return redisTemplate;
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
index 013d95e..96e4eca 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,6 +11,7 @@
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.service.IMobilePositionService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import org.dom4j.DocumentException;
@@ -20,15 +21,11 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.ObjectUtils;
 
 import javax.sip.RequestEvent;
 import javax.sip.header.FromHeader;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -54,6 +51,9 @@
 	@Autowired
 	private IDeviceChannelService deviceChannelService;
 
+	@Autowired
+	private IMobilePositionService mobilePositionService;
+
 	public void process(RequestEvent evt) {
 
 		if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@@ -64,13 +64,10 @@
 	}
 
 	@Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆�
-	@Transactional
 	public void executeTaskQueue() {
 		if (taskQueue.isEmpty()) {
 			return;
 		}
-		Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
-		List<MobilePosition> addMobilePositionList = new ArrayList<>();
 		for (HandlerCatchData take : taskQueue) {
 			if (take == null) {
 				continue;
@@ -150,16 +147,7 @@
 //					mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
 				mobilePosition.setReportSource("Mobile Position");
 
-				// 鏇存柊device channel 鐨勭粡绾害
-				DeviceChannel deviceChannel = new DeviceChannel();
-				deviceChannel.setDeviceId(device.getDeviceId());
-				deviceChannel.setLongitude(mobilePosition.getLongitude());
-				deviceChannel.setLatitude(mobilePosition.getLatitude());
-				deviceChannel.setGpsTime(mobilePosition.getTime());
-				updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
-				addMobilePositionList.add(mobilePosition);
-
-
+				mobilePositionService.add(mobilePosition);
 				// 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭�
 				try {
 					eventPublisher.mobilePositionEventPublish(mobilePosition);
@@ -199,21 +187,6 @@
 			}
 		}
 		taskQueue.clear();
-		if(!updateChannelMap.isEmpty()) {
-			List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
-			logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size());
-			deviceChannelService.batchUpdateChannel(channels);
-			updateChannelMap.clear();
-		}
-		if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
-			try {
-				logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
-				deviceChannelService.batchAddMobilePosition(addMobilePositionList);
-			}catch (Exception e) {
-				logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size());
-			}
-			addMobilePositionList.clear();
-		}
 	}
 	@Scheduled(fixedRate = 10000)
 	public void execute(){
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
new file mode 100644
index 0000000..9af6415
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
@@ -0,0 +1,13 @@
+package com.genersoft.iot.vmp.service;
+
+
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+
+import java.util.List;
+
+public interface IMobilePositionService {
+
+    void add(List<MobilePosition> mobilePositionList);
+
+    void add(MobilePosition mobilePosition);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
new file mode 100644
index 0000000..277493a
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
@@ -0,0 +1,95 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+import com.genersoft.iot.vmp.service.IMobilePositionService;
+import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
+import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Service
+public class MobilePositionServiceImpl implements IMobilePositionService {
+
+    @Autowired
+    private DeviceChannelMapper channelMapper;
+
+    @Autowired
+    private DeviceMobilePositionMapper mobilePositionMapper;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisTemplate<String, MobilePosition> redisTemplate;
+
+    private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class);
+
+    private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
+
+    @Override
+    public void add(MobilePosition mobilePosition) {
+        List<MobilePosition> list = new ArrayList<>();
+        list.add(mobilePosition);
+        add(list);
+    }
+
+    @Override
+    public void add(List<MobilePosition> mobilePositionList) {
+        redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
+    }
+
+    private List<MobilePosition> get(int length) {
+        Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
+        if (size == null || size == 0) {
+            return new ArrayList<>();
+        }
+        List<MobilePosition> mobilePositions;
+        if (size > length) {
+            mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length);
+        }else {
+            mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size);
+        }
+        return  mobilePositions;
+    }
+
+
+
+    @Scheduled(fixedRate = 1000)
+    @Transactional
+    public void executeTaskQueue() {
+        int countLimit = 3000;
+        List<MobilePosition> mobilePositions = get(countLimit);
+        if (mobilePositions == null || mobilePositions.isEmpty()) {
+            return;
+        }
+        if (userSetting.getSavePositionHistory()) {
+            mobilePositionMapper.batchadd(mobilePositions);
+        }
+        logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", mobilePositions.size());
+        Map<String, DeviceChannel> updateChannelMap = new HashMap<>();
+        for (MobilePosition mobilePosition : mobilePositions) {
+            DeviceChannel deviceChannel = new DeviceChannel();
+            deviceChannel.setDeviceId(mobilePosition.getDeviceId());
+            deviceChannel.setLongitude(mobilePosition.getLongitude());
+            deviceChannel.setLatitude(mobilePosition.getLatitude());
+            deviceChannel.setGpsTime(mobilePosition.getTime());
+            updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel);
+        }
+        List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
+        channelMapper.batchUpdatePosition(channels);
+    }
+
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
index be78a52..1555221 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,23 +401,6 @@
             " </script>"})
     int updatePosition(DeviceChannel deviceChannel);
 
-    @Update({"<script>" +
-            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
-            " UPDATE" +
-            " wvp_device_channel" +
-            " SET gps_time=#{item.gpsTime}" +
-            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
-            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
-            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
-            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
-            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
-            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
-            "WHERE device_id=#{item.deviceId} " +
-            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
-            "</foreach>" +
-            "</script>"})
-    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
-
     @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
     List<DeviceChannel> getAllChannelInPlay();
 
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
index c28b16e..5124310 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -49,7 +49,7 @@
     void batchadd2(List<MobilePosition> mobilePositions);
 
     @Insert("<script> " +
-            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+            "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
             "insert into wvp_device_mobile_position " +
             "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
             "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
@@ -57,7 +57,7 @@
             "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
             "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
             "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
-            "#{item.createTime}); " +
+            "#{item.createTime}) " +
             "</foreach> " +
             "</script>")
     void batchadd(List<MobilePosition> mobilePositions);

--
Gitblit v1.8.0