648540858
2024-05-13 5564cfb384db16db972e8cb91ca55cf345cfb6ea
优化大量notify 移动位置订阅的入库
4个文件已修改
2个文件已添加
183 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -25,4 +26,20 @@
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }
    @Bean
    public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>();
        // 使用fastJson序列化
        GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
        // value值的序列化采用fastJsonRedisSerializer
        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        // key的序列化采用StringRedisSerializer
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
@@ -20,15 +21,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -54,6 +51,9 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private IMobilePositionService mobilePositionService;
    public void process(RequestEvent evt) {
        if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@@ -64,13 +64,10 @@
    }
    @Scheduled(fixedRate = 200) //每200毫秒执行一次
    @Transactional
    public void executeTaskQueue() {
        if (taskQueue.isEmpty()) {
            return;
        }
        Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
        List<MobilePosition> addMobilePositionList = new ArrayList<>();
        for (HandlerCatchData take : taskQueue) {
            if (take == null) {
                continue;
@@ -150,16 +147,7 @@
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
                mobilePosition.setReportSource("Mobile Position");
                // 更新device channel 的经纬度
                DeviceChannel deviceChannel = new DeviceChannel();
                deviceChannel.setDeviceId(device.getDeviceId());
                deviceChannel.setLongitude(mobilePosition.getLongitude());
                deviceChannel.setLatitude(mobilePosition.getLatitude());
                deviceChannel.setGpsTime(mobilePosition.getTime());
                updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
                addMobilePositionList.add(mobilePosition);
                mobilePositionService.add(mobilePosition);
                // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
                try {
                    eventPublisher.mobilePositionEventPublish(mobilePosition);
@@ -199,21 +187,6 @@
            }
        }
        taskQueue.clear();
        if(!updateChannelMap.isEmpty()) {
            List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
            logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
            deviceChannelService.batchUpdateChannel(channels);
            updateChannelMap.clear();
        }
        if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
            try {
                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            }
            addMobilePositionList.clear();
        }
    }
    @Scheduled(fixedRate = 10000)
    public void execute(){
src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java
New file
@@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import java.util.List;
public interface IMobilePositionService {
    void add(List<MobilePosition> mobilePositionList);
    void add(MobilePosition mobilePosition);
}
src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java
New file
@@ -0,0 +1,95 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class MobilePositionServiceImpl implements IMobilePositionService {
    @Autowired
    private DeviceChannelMapper channelMapper;
    @Autowired
    private DeviceMobilePositionMapper mobilePositionMapper;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private RedisTemplate<String, MobilePosition> redisTemplate;
    private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class);
    private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
    @Override
    public void add(MobilePosition mobilePosition) {
        List<MobilePosition> list = new ArrayList<>();
        list.add(mobilePosition);
        add(list);
    }
    @Override
    public void add(List<MobilePosition> mobilePositionList) {
        redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
    }
    private List<MobilePosition> get(int length) {
        Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
        if (size == null || size == 0) {
            return new ArrayList<>();
        }
        List<MobilePosition> mobilePositions;
        if (size > length) {
            mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length);
        }else {
            mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size);
        }
        return  mobilePositions;
    }
    @Scheduled(fixedRate = 1000)
    @Transactional
    public void executeTaskQueue() {
        int countLimit = 3000;
        List<MobilePosition> mobilePositions = get(countLimit);
        if (mobilePositions == null || mobilePositions.isEmpty()) {
            return;
        }
        if (userSetting.getSavePositionHistory()) {
            mobilePositionMapper.batchadd(mobilePositions);
        }
        logger.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
        Map<String, DeviceChannel> updateChannelMap = new HashMap<>();
        for (MobilePosition mobilePosition : mobilePositions) {
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(mobilePosition.getDeviceId());
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel);
        }
        List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
        channelMapper.batchUpdatePosition(channels);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,23 +401,6 @@
            " </script>"})
    int updatePosition(DeviceChannel deviceChannel);
    @Update({"<script>" +
            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
            " UPDATE" +
            " wvp_device_channel" +
            " SET gps_time=#{item.gpsTime}" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
            "WHERE device_id=#{item.deviceId} " +
            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
            "</foreach>" +
            "</script>"})
    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
    @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
    List<DeviceChannel> getAllChannelInPlay();
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -49,7 +49,7 @@
    void batchadd2(List<MobilePosition> mobilePositions);
    @Insert("<script> " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
@@ -57,7 +57,7 @@
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}); " +
            "#{item.createTime}) " +
            "</foreach> " +
            "</script>")
    void batchadd(List<MobilePosition> mobilePositions);