648540858
2024-04-22 8cba63642fcff122907bd7d7a8d7d822555d34ca
优化notify消息处理
7个文件已修改
238 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 163 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
    private List<String> allowedOrigins = new ArrayList<>();
    private int maxNotifyCountQueue = 10000;
    private int maxNotifyCountQueue = 100000;
    private int registerAgainAfterTime = 60;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -96,10 +96,6 @@
                // 遍历DeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
                    }
                    Element eventElement = itemDevice.element("Event");
                    String event;
                    if (eventElement == null) {
@@ -264,21 +260,12 @@
        }
    }
    // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线
    private void executeSave(){
        try {
            executeSaveForAdd();
        } catch (Exception e) {
            logger.error("[存储收到的增加通道] 异常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
        try {
            executeSaveForOnline();
@@ -290,6 +277,17 @@
        } catch (Exception e) {
            logger.error("[存储收到的通道离线] 异常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
        dynamicTask.stop(talkKey);
    }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -11,7 +11,6 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -20,12 +19,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,78 +67,100 @@
    private final static String talkKey = "notify-request-for-mobile-position-task";
//    @Async("taskExecutor")
    public void process(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            long startTime = System.currentTimeMillis();
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
                return;
            }
            Device device = redisCatchStorage.getDevice(deviceId);
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setCreateTime(DateUtil.getNow());
            List<Element> elements = rootElement.elements();
            String channelId = null;
            for (Element element : elements) {
                switch (element.getName()){
                    case "DeviceID":
                        channelId = element.getStringValue();
                        if (device == null) {
                            device = redisCatchStorage.getDevice(channelId);
                            if (device == null) {
                                // 根据通道id查询设备Id
                                List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                                if (!deviceList.isEmpty()) {
                                    device = deviceList.get(0);
                                }
                            }
                        }
                        if (device == null) {
                            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                            return;
                        }
                        mobilePosition.setDeviceId(device.getDeviceId());
                        mobilePosition.setChannelId(channelId);
                        // 兼容设备部分设备上报是通道编号与设备编号一致的情况
                        if (deviceId.equals(channelId)) {
                            List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                            if (deviceChannels.size() == 1) {
                                channelId = deviceChannels.get(0).getChannelId();
                            }
                        }
                        if (!ObjectUtils.isEmpty(device.getName())) {
                            mobilePosition.setDeviceName(device.getName());
                        }
                        mobilePosition.setDeviceId(device.getDeviceId());
                        mobilePosition.setChannelId(channelId);
                        continue;
                    case "Time":
                        String timeVal = element.getStringValue();
                        if (ObjectUtils.isEmpty(timeVal)) {
                            mobilePosition.setTime(DateUtil.getNow());
                        } else {
                            mobilePosition.setTime(SipUtils.parseTime(timeVal));
                        }
                        continue;
                    case "Longitude":
                        mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                        continue;
                    case "Latitude":
                        mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                        continue;
                    case "Speed":
                        String speedVal = element.getStringValue();
                        if (NumericUtil.isDouble(speedVal)) {
                            mobilePosition.setSpeed(Double.parseDouble(speedVal));
                        } else {
                            mobilePosition.setSpeed(0.0);
                        }
                        continue;
                    case "Direction":
                        String directionVal = element.getStringValue();
                        if (NumericUtil.isDouble(directionVal)) {
                            mobilePosition.setDirection(Double.parseDouble(directionVal));
                        } else {
                            mobilePosition.setDirection(0.0);
                        }
                        continue;
                    case "Altitude":
                        String altitudeVal = element.getStringValue();
                        if (NumericUtil.isDouble(altitudeVal)) {
                            mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                        } else {
                            mobilePosition.setAltitude(0.0);
                        }
                        continue;
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getTextTrim().toString();
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                device = redisCatchStorage.getDevice(channelId);
                if (device == null) {
                    // 根据通道id查询设备Id
                    List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                    if (deviceList.size() > 0) {
                        device = deviceList.get(0);
                    }
                }
            }
            if (device == null) {
                logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                return;
            }
            // 兼容设备部分设备上报是通道编号与设备编号一致的情况
            if (deviceId.equals(channelId)) {
                List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                if (deviceChannels.size() == 1) {
                    channelId = deviceChannels.get(0).getChannelId();
                }
            }
            if (!ObjectUtils.isEmpty(device.getName())) {
                mobilePosition.setDeviceName(device.getName());
            }
            mobilePosition.setDeviceId(device.getDeviceId());
            mobilePosition.setChannelId(channelId);
            String time = XmlUtil.getText(rootElement, "Time");
            if (ObjectUtils.isEmpty(time)) {
                mobilePosition.setTime(DateUtil.getNow());
            } else {
                mobilePosition.setTime(SipUtils.parseTime(time));
            }
            mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
                mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
            } else {
                mobilePosition.setSpeed(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
                mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
            } else {
                mobilePosition.setDirection(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
                mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
//            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
            mobilePosition.setReportSource("Mobile Position");
            // 更新device channel 的经纬度
@@ -149,13 +170,13 @@
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(channelId, deviceChannel);
            updateChannelMap.put(deviceId + channelId, deviceChannel);
            addMobilePositionList.add(mobilePosition);
            if(updateChannelMap.size() > 300) {
            if(updateChannelMap.size() > 100) {
                executeSaveChannel();
            }
            if (userSetting.isSavePositionHistory()) {
                if(addMobilePositionList.size() > 300) {
                if(addMobilePositionList.size() > 100) {
                    executeSaveMobilePosition();
                }
            }
@@ -170,7 +191,7 @@
//            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            if (!dynamicTask.contains(talkKey)) {
                dynamicTask.startDelay(talkKey, this::executeSave, 1000);
                dynamicTask.startDelay(talkKey, this::executeSave, 3000);
            }
        } catch (DocumentException e) {
@@ -186,29 +207,29 @@
        dynamicTask.stop(talkKey);
    }
    private void executeSaveChannel(){
    @Async("taskExecutor")
    public void executeSaveChannel(){
        dynamicTask.execute();
        try {
            logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
//            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
//            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
            updateChannelMap.clear();
        }catch (Exception e) {
        }
    }
    private void executeSaveMobilePosition(){
    @Async("taskExecutor")
    public void executeSaveMobilePosition(){
        if (userSetting.isSavePositionHistory()) {
            try {
                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
//                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
//                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
                addMobilePositionList.clear();
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -25,6 +25,8 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -76,6 +78,9 @@
    @Autowired
    private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
    @Autowired
    private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -105,10 +110,10 @@
            logger.error("未处理的异常 ", e);
        }
        boolean runed = !taskQueue.isEmpty();
        logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!runed) {
            taskExecutor.execute(()-> {
//                logger.warn("开始处理");
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
@@ -129,8 +134,12 @@
                            logger.info("接收到Alarm通知");
                            processNotifyAlarm(take.getEvt());
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                            logger.info("接收到MobilePosition通知");
                            processNotifyMobilePosition(take.getEvt());
//                            logger.info("接收到MobilePosition通知");
//                            processNotifyMobilePosition(take.getEvt());
                            taskExecutor.execute(() -> {
                                notifyRequestForMobilePositionProcessor.process(take.getEvt());
                            });
                        } else {
                            logger.info("接收到消息:" + cmd);
                        }
@@ -147,11 +156,11 @@
     *
     * @param evt
     */
    private void processNotifyMobilePosition(RequestEvent evt) {
    @Async("taskExecutor")
    public void processNotifyMobilePosition(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
@@ -360,4 +369,9 @@
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
    @Scheduled(fixedRate = 1000)   //每1秒执行一次
    public void execute(){
        System.out.println("待处理消息数量: " + taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -356,11 +356,11 @@
    @Override
    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
        channelMapper.batchUpdate(channelList);
    }
    @Override
    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
        deviceMobilePositionMapper.batchadd(mobilePositions);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -347,8 +347,8 @@
            "<if test='item.hasAudio != null'>, has_audio=#{item.hasAudio}</if>" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
            "<if test='custom_latitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
            "<if test='item.customLongitude != null'>, custom_longitude=#{item.customLongitude}</if>" +
            "<if test='item.customLatitude != null'>, custom_latitude=#{item.customLatitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -33,4 +33,19 @@
    @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
    int clearMobilePositionsByDeviceId(String deviceId);
    @Insert("<script> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}) " +
            "</foreach> " +
            "</script>")
    void batchadd(List<MobilePosition> mobilePositions);
}