src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
@@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.conf.redis; import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -25,4 +26,20 @@ redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } @Bean public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>(); // 使用fastJson序列化 GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); // value值的序列化采用fastJsonRedisSerializer redisTemplate.setValueSerializer(fastJsonRedisSerializer); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); // key的序列化采用StringRedisSerializer redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } } src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.dom4j.DocumentException; @@ -20,15 +21,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -54,6 +51,9 @@ @Autowired private IDeviceChannelService deviceChannelService; @Autowired private IMobilePositionService mobilePositionService; public void process(RequestEvent evt) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { @@ -64,13 +64,10 @@ } @Scheduled(fixedRate = 200) //每200毫秒执行一次 @Transactional public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; } Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); List<MobilePosition> addMobilePositionList = new ArrayList<>(); for (HandlerCatchData take : taskQueue) { if (take == null) { continue; @@ -150,16 +147,7 @@ // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); addMobilePositionList.add(mobilePosition); mobilePositionService.add(mobilePosition); // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 try { eventPublisher.mobilePositionEventPublish(mobilePosition); @@ -199,21 +187,6 @@ } } taskQueue.clear(); if(!updateChannelMap.isEmpty()) { List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); deviceChannelService.batchUpdateChannel(channels); updateChannelMap.clear(); } if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); } addMobilePositionList.clear(); } } @Scheduled(fixedRate = 10000) public void execute(){ src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
New file @@ -0,0 +1,13 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import java.util.List; public interface IMobilePositionService { void add(List<MobilePosition> mobilePositionList); void add(MobilePosition mobilePosition); } src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
New file @@ -0,0 +1,95 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class MobilePositionServiceImpl implements IMobilePositionService { @Autowired private DeviceChannelMapper channelMapper; @Autowired private DeviceMobilePositionMapper mobilePositionMapper; @Autowired private UserSetting userSetting; @Autowired private RedisTemplate<String, MobilePosition> redisTemplate; private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class); private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; @Override public void add(MobilePosition mobilePosition) { List<MobilePosition> list = new ArrayList<>(); list.add(mobilePosition); add(list); } @Override public void add(List<MobilePosition> mobilePositionList) { redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList); } private List<MobilePosition> get(int length) { Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST); if (size == null || size == 0) { return new ArrayList<>(); } List<MobilePosition> mobilePositions; if (size > length) { mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length); }else { mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size); } return mobilePositions; } @Scheduled(fixedRate = 1000) @Transactional public void executeTaskQueue() { int countLimit = 3000; List<MobilePosition> mobilePositions = get(countLimit); if (mobilePositions == null || mobilePositions.isEmpty()) { return; } if (userSetting.getSavePositionHistory()) { mobilePositionMapper.batchadd(mobilePositions); } logger.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); Map<String, DeviceChannel> updateChannelMap = new HashMap<>(); for (MobilePosition mobilePosition : mobilePositions) { DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(mobilePosition.getDeviceId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel); } List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values()); channelMapper.batchUpdatePosition(channels); } } src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,23 +401,6 @@ " </script>"}) int updatePosition(DeviceChannel deviceChannel); @Update({"<script>" + "<foreach collection='deviceChannelList' item='item' separator=';'>" + " UPDATE" + " wvp_device_channel" + " SET gps_time=#{item.gpsTime}" + "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" + "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" + "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" + "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" + "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" + "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" + "WHERE device_id=#{item.deviceId} " + " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" + "</foreach>" + "</script>"}) int batchUpdatePosition(List<DeviceChannel> deviceChannelList); @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List<DeviceChannel> getAllChannelInPlay(); src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -49,7 +49,7 @@ void batchadd2(List<MobilePosition> mobilePositions); @Insert("<script> " + "<foreach collection='mobilePositions' index='index' item='item' separator=','> " + "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " + "insert into wvp_device_mobile_position " + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ @@ -57,7 +57,7 @@ "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + "#{item.createTime}); " + "#{item.createTime}) " + "</foreach> " + "</script>") void batchadd(List<MobilePosition> mobilePositions);