From 5564cfb384db16db972e8cb91ca55cf345cfb6ea Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 13 五月 2024 17:22:36 +0800 Subject: [PATCH] 优化大量notify 移动位置订阅的入库 --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java | 17 ++++ src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java | 13 +++ src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java | 95 +++++++++++++++++++++++ src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java | 37 +-------- src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java | 4 src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 17 ---- 6 files changed, 132 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java index df3345e..2dc66b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.conf.redis; import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -25,4 +26,20 @@ redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } + + @Bean + public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>(); + // 浣跨敤fastJson搴忓垪鍖� + GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); + // value鍊肩殑搴忓垪鍖栭噰鐢╢astJsonRedisSerializer + redisTemplate.setValueSerializer(fastJsonRedisSerializer); + redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); + + // key鐨勫簭鍒楀寲閲囩敤StringRedisSerializer + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.setConnectionFactory(redisConnectionFactory); + return redisTemplate; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 013d95e..96e4eca 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.dom4j.DocumentException; @@ -20,15 +21,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -54,6 +51,9 @@ @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private IMobilePositionService mobilePositionService; + public void process(RequestEvent evt) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { @@ -64,13 +64,10 @@ } @Scheduled(fixedRate = 200) //姣�200姣鎵ц涓�娆� - @Transactional public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; } - Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); - List<MobilePosition> addMobilePositionList = new ArrayList<>(); for (HandlerCatchData take : taskQueue) { if (take == null) { continue; @@ -150,16 +147,7 @@ // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); - // 鏇存柊device channel 鐨勭粡绾害 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); - addMobilePositionList.add(mobilePosition); - - + mobilePositionService.add(mobilePosition); // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� try { eventPublisher.mobilePositionEventPublish(mobilePosition); @@ -199,21 +187,6 @@ } } taskQueue.clear(); - if(!updateChannelMap.isEmpty()) { - List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); - logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", channels.size()); - deviceChannelService.batchUpdateChannel(channels); - updateChannelMap.clear(); - } - if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { - try { - logger.info("[绉诲姩浣嶇疆璁㈤槄] 娣诲姞閫氶亾杞ㄨ抗鐐逛綅锛� {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); - }catch (Exception e) { - logger.info("[绉诲姩浣嶇疆璁㈤槄] b娣诲姞閫氶亾杞ㄨ抗鐐逛綅淇濆瓨澶辫触锛� {}", addMobilePositionList.size()); - } - addMobilePositionList.clear(); - } } @Scheduled(fixedRate = 10000) public void execute(){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java new file mode 100644 index 0000000..9af6415 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java @@ -0,0 +1,13 @@ +package com.genersoft.iot.vmp.service; + + +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; + +import java.util.List; + +public interface IMobilePositionService { + + void add(List<MobilePosition> mobilePositionList); + + void add(MobilePosition mobilePosition); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java new file mode 100644 index 0000000..277493a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -0,0 +1,95 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.service.IMobilePositionService; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +@Service +public class MobilePositionServiceImpl implements IMobilePositionService { + + @Autowired + private DeviceChannelMapper channelMapper; + + @Autowired + private DeviceMobilePositionMapper mobilePositionMapper; + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisTemplate<String, MobilePosition> redisTemplate; + + private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class); + + private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; + + @Override + public void add(MobilePosition mobilePosition) { + List<MobilePosition> list = new ArrayList<>(); + list.add(mobilePosition); + add(list); + } + + @Override + public void add(List<MobilePosition> mobilePositionList) { + redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList); + } + + private List<MobilePosition> get(int length) { + Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST); + if (size == null || size == 0) { + return new ArrayList<>(); + } + List<MobilePosition> mobilePositions; + if (size > length) { + mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length); + }else { + mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size); + } + return mobilePositions; + } + + + + @Scheduled(fixedRate = 1000) + @Transactional + public void executeTaskQueue() { + int countLimit = 3000; + List<MobilePosition> mobilePositions = get(countLimit); + if (mobilePositions == null || mobilePositions.isEmpty()) { + return; + } + if (userSetting.getSavePositionHistory()) { + mobilePositionMapper.batchadd(mobilePositions); + } + logger.info("[绉诲姩浣嶇疆璁㈤槄]鏇存柊閫氶亾浣嶇疆锛� {}", mobilePositions.size()); + Map<String, DeviceChannel> updateChannelMap = new HashMap<>(); + for (MobilePosition mobilePosition : mobilePositions) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(mobilePosition.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel); + } + List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); + channelMapper.batchUpdatePosition(channels); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index be78a52..1555221 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -401,23 +401,6 @@ " </script>"}) int updatePosition(DeviceChannel deviceChannel); - @Update({"<script>" + - "<foreach collection='deviceChannelList' item='item' separator=';'>" + - " UPDATE" + - " wvp_device_channel" + - " SET gps_time=#{item.gpsTime}" + - "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + - "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + - "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + - "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + - "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + - "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" + - "WHERE device_id=#{item.deviceId} " + - " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" + - "</foreach>" + - "</script>"}) - int batchUpdatePosition(List<DeviceChannel> deviceChannelList); - @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index c28b16e..5124310 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -49,7 +49,7 @@ void batchadd2(List<MobilePosition> mobilePositions); @Insert("<script> " + - "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + + "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " + "insert into wvp_device_mobile_position " + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ @@ -57,7 +57,7 @@ "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + - "#{item.createTime}); " + + "#{item.createTime}) " + "</foreach> " + "</script>") void batchadd(List<MobilePosition> mobilePositions); -- Gitblit v1.8.0