From 5564cfb384db16db972e8cb91ca55cf345cfb6ea Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 13 五月 2024 17:22:36 +0800
Subject: [PATCH] 优化大量notify 移动位置订阅的入库
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java | 17 ++++
src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java | 13 +++
src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java | 95 +++++++++++++++++++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 37 +--------
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 4
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 17 ----
6 files changed, 132 insertions(+), 51 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
index df3345e..2dc66b3 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -25,4 +26,20 @@
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
+
+ @Bean
+ public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) {
+ RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>();
+ // 浣跨敤fastJson搴忓垪鍖�
+ GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
+ // value鍊肩殑搴忓垪鍖栭噰鐢╢astJsonRedisSerializer
+ redisTemplate.setValueSerializer(fastJsonRedisSerializer);
+ redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
+
+ // key鐨勫簭鍒楀寲閲囩敤StringRedisSerializer
+ redisTemplate.setKeySerializer(new StringRedisSerializer());
+ redisTemplate.setHashKeySerializer(new StringRedisSerializer());
+ redisTemplate.setConnectionFactory(redisConnectionFactory);
+ return redisTemplate;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
index 013d95e..96e4eca 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
@@ -20,15 +21,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -54,6 +51,9 @@
@Autowired
private IDeviceChannelService deviceChannelService;
+ @Autowired
+ private IMobilePositionService mobilePositionService;
+
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@@ -64,13 +64,10 @@
}
@Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆�
- @Transactional
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
- Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
- List<MobilePosition> addMobilePositionList = new ArrayList<>();
for (HandlerCatchData take : taskQueue) {
if (take == null) {
continue;
@@ -150,16 +147,7 @@
// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position");
- // 鏇存柊device channel 鐨勭粡绾害
- DeviceChannel deviceChannel = new DeviceChannel();
- deviceChannel.setDeviceId(device.getDeviceId());
- deviceChannel.setLongitude(mobilePosition.getLongitude());
- deviceChannel.setLatitude(mobilePosition.getLatitude());
- deviceChannel.setGpsTime(mobilePosition.getTime());
- updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
- addMobilePositionList.add(mobilePosition);
-
-
+ mobilePositionService.add(mobilePosition);
// 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭�
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
@@ -199,21 +187,6 @@
}
}
taskQueue.clear();
- if(!updateChannelMap.isEmpty()) {
- List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
- logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size());
- deviceChannelService.batchUpdateChannel(channels);
- updateChannelMap.clear();
- }
- if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
- try {
- logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size());
- deviceChannelService.batchAddMobilePosition(addMobilePositionList);
- }catch (Exception e) {
- logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size());
- }
- addMobilePositionList.clear();
- }
}
@Scheduled(fixedRate = 10000)
public void execute(){
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
new file mode 100644
index 0000000..9af6415
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
@@ -0,0 +1,13 @@
+package com.genersoft.iot.vmp.service;
+
+
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+
+import java.util.List;
+
+public interface IMobilePositionService {
+
+ void add(List<MobilePosition> mobilePositionList);
+
+ void add(MobilePosition mobilePosition);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
new file mode 100644
index 0000000..277493a
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
@@ -0,0 +1,95 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
+import com.genersoft.iot.vmp.service.IMobilePositionService;
+import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
+import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Service
+public class MobilePositionServiceImpl implements IMobilePositionService {
+
+ @Autowired
+ private DeviceChannelMapper channelMapper;
+
+ @Autowired
+ private DeviceMobilePositionMapper mobilePositionMapper;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private RedisTemplate<String, MobilePosition> redisTemplate;
+
+ private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class);
+
+ private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
+
+ @Override
+ public void add(MobilePosition mobilePosition) {
+ List<MobilePosition> list = new ArrayList<>();
+ list.add(mobilePosition);
+ add(list);
+ }
+
+ @Override
+ public void add(List<MobilePosition> mobilePositionList) {
+ redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
+ }
+
+ private List<MobilePosition> get(int length) {
+ Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
+ if (size == null || size == 0) {
+ return new ArrayList<>();
+ }
+ List<MobilePosition> mobilePositions;
+ if (size > length) {
+ mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length);
+ }else {
+ mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size);
+ }
+ return mobilePositions;
+ }
+
+
+
+ @Scheduled(fixedRate = 1000)
+ @Transactional
+ public void executeTaskQueue() {
+ int countLimit = 3000;
+ List<MobilePosition> mobilePositions = get(countLimit);
+ if (mobilePositions == null || mobilePositions.isEmpty()) {
+ return;
+ }
+ if (userSetting.getSavePositionHistory()) {
+ mobilePositionMapper.batchadd(mobilePositions);
+ }
+ logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", mobilePositions.size());
+ Map<String, DeviceChannel> updateChannelMap = new HashMap<>();
+ for (MobilePosition mobilePosition : mobilePositions) {
+ DeviceChannel deviceChannel = new DeviceChannel();
+ deviceChannel.setDeviceId(mobilePosition.getDeviceId());
+ deviceChannel.setLongitude(mobilePosition.getLongitude());
+ deviceChannel.setLatitude(mobilePosition.getLatitude());
+ deviceChannel.setGpsTime(mobilePosition.getTime());
+ updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel);
+ }
+ List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
+ channelMapper.batchUpdatePosition(channels);
+ }
+
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
index be78a52..1555221 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,23 +401,6 @@
" </script>"})
int updatePosition(DeviceChannel deviceChannel);
- @Update({"<script>" +
- "<foreach collection='deviceChannelList' item='item' separator=';'>" +
- " UPDATE" +
- " wvp_device_channel" +
- " SET gps_time=#{item.gpsTime}" +
- "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
- "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
- "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
- "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
- "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
- "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
- "WHERE device_id=#{item.deviceId} " +
- " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
- "</foreach>" +
- "</script>"})
- int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
-
@Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
List<DeviceChannel> getAllChannelInPlay();
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
index c28b16e..5124310 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -49,7 +49,7 @@
void batchadd2(List<MobilePosition> mobilePositions);
@Insert("<script> " +
- "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
+ "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
"insert into wvp_device_mobile_position " +
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
"longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
@@ -57,7 +57,7 @@
"(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
"#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
"#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
- "#{item.createTime}); " +
+ "#{item.createTime}) " +
"</foreach> " +
"</script>")
void batchadd(List<MobilePosition> mobilePositions);
--
Gitblit v1.8.0