648540858
2024-04-23 66a681d67969ae64c5cfe7140cb60a7885edef86
优化notify消息中目录和移动位置的处理
8个文件已修改
494 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 198 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 153 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -20,10 +18,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,8 +37,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
    private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
    private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
    private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
    private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
@@ -60,17 +56,14 @@
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    @Autowired
    private SipConfig sipConfig;
    private final static String talkKey = "notify-request-for-catalog-task";
    public void process(RequestEvent evt) {
    @Transactional
    public void process(List<RequestEvent> evtList) {
        if (evtList.isEmpty()) {
            return;
        }
        for (RequestEvent evt : evtList) {
        try {
            long start = System.currentTimeMillis();
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -113,15 +106,11 @@
                        channel.setParentId(null);
                    }
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                        logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
                    switch (event) {
                        case CatalogEvent.ON:
                            // 上线
                            logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            updateChannelOnlineList.add(channel);
                            if (updateChannelOnlineList.size() > 300) {
                                executeSaveForOnline();
                            }
                                deviceChannelService.online(channel);
                            if (userSetting.getDeviceStatusNotify()) {
                                // 发送redis消息
                                redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
@@ -129,111 +118,35 @@
                            break;
                        case CatalogEvent.OFF :
                            // 离线
                            logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.VLOST:
                            // 视频丢失
                            logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.DEFECT:
                            // 故障
                            logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                // 离线
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                    logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                    deviceChannelService.offline(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.ADD:
                            // 增加
                            logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            // 判断此通道是否存在
                            DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannel != null) {
                                logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                channel.setId(deviceChannel.getId());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                            }
                            break;
                        case CatalogEvent.DEL:
                            // 删除
                            logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            deleteChannelList.add(channel);
                                deviceChannelService.delete(channel);
                            if (userSetting.getDeviceStatusNotify()) {
                                // 发送redis消息
                                redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                            }
                            if (deleteChannelList.size() > 300) {
                                executeSaveForDelete();
                            }
                            break;
                            case CatalogEvent.ADD:
                        case CatalogEvent.UPDATE:
                            // 更新
                            logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            // 判断此通道是否存在
                            DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannelForUpdate != null) {
                                channel.setId(deviceChannelForUpdate.getId());
                                channel.setUpdateTime(DateUtil.getNow());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                                deviceChannelService.updateChannel(deviceId,channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                            }
                            break;
                        default:
@@ -242,92 +155,11 @@
                    }
                    // 转发变化信息
                    eventPublisher.catalogEventPublish(null, channel, event);
                    if (!updateChannelMap.keySet().isEmpty()
                            || !addChannelMap.keySet().isEmpty()
                            || !updateChannelOnlineList.isEmpty()
                            || !updateChannelOfflineList.isEmpty()
                            || !deleteChannelList.isEmpty()) {
                        if (!dynamicTask.contains(talkKey)) {
                            dynamicTask.startDelay(talkKey, this::executeSave, 1000);
                        }
                    }
                }
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线
    private void executeSave(){
        try {
            executeSaveForAdd();
        } catch (Exception e) {
            logger.error("[存储收到的增加通道] 异常: ", e );
        }
        try {
            executeSaveForOnline();
        } catch (Exception e) {
            logger.error("[存储收到的通道上线] 异常: ", e );
        }
        try {
            executeSaveForOffline();
        } catch (Exception e) {
            logger.error("[存储收到的通道离线] 异常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
        dynamicTask.stop(talkKey);
    }
    private void executeSaveForUpdate(){
        if (!updateChannelMap.values().isEmpty()) {
            logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannel(deviceChannels);
            updateChannelMap.clear();
        }
    }
    private void executeSaveForAdd(){
        if (!addChannelMap.values().isEmpty()) {
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
            addChannelMap.clear();
            deviceChannelService.batchAddChannel(deviceChannels);
        }
    }
    private void executeSaveForDelete(){
        if (!deleteChannelList.isEmpty()) {
            deviceChannelService.deleteChannels(deleteChannelList);
            deleteChannelList.clear();
        }
    }
    private void executeSaveForOnline(){
        if (!updateChannelOnlineList.isEmpty()) {
            deviceChannelService.channelsOnline(updateChannelOnlineList);
            updateChannelOnlineList.clear();
        }
    }
    private void executeSaveForOffline(){
        if (!updateChannelOfflineList.isEmpty()) {
            deviceChannelService.channelsOffline(updateChannelOfflineList);
            updateChannelOfflineList.clear();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@@ -19,8 +17,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
@@ -29,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -39,10 +36,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
    private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
    private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
    @Autowired
@@ -57,19 +50,14 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    @Autowired
    private SipConfig sipConfig;
    private final static String talkKey = "notify-request-for-mobile-position-task";
    @Async("taskExecutor")
    public void process(RequestEvent evt) {
    @Transactional
    public void process(List<RequestEvent> eventList) {
        if (eventList.isEmpty()) {
            return;
        }
        Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
        List<MobilePosition> addMobilePositionList = new ArrayList<>();
        for (RequestEvent evt : eventList) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -81,42 +69,22 @@
                return;
            }
            Device device = redisCatchStorage.getDevice(deviceId);
                if (device == null) {
                    logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
                    return;
                }
            MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setDeviceId(device.getDeviceId());
                mobilePosition.setDeviceName(device.getName());
            mobilePosition.setCreateTime(DateUtil.getNow());
            List<Element> elements = rootElement.elements();
            String channelId = null;
            for (Element element : elements) {
                switch (element.getName()){
                    case "DeviceID":
                        channelId = element.getStringValue();
                        if (device == null) {
                            device = redisCatchStorage.getDevice(channelId);
                            if (device == null) {
                                // 根据通道id查询设备Id
                                List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                                if (!deviceList.isEmpty()) {
                                    device = deviceList.get(0);
                                }
                            }
                        }
                        if (device == null) {
                            logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                            return;
                        }
                        mobilePosition.setDeviceId(device.getDeviceId());
                            String channelId = element.getStringValue();
                            if (!deviceId.equals(channelId)) {
                        mobilePosition.setChannelId(channelId);
                        // 兼容设备部分设备上报是通道编号与设备编号一致的情况
                        if (deviceId.equals(channelId)) {
                            List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                            if (deviceChannels.size() == 1) {
                                channelId = deviceChannels.get(0).getChannelId();
                            }
                        }
                        if (!ObjectUtils.isEmpty(device.getName())) {
                            mobilePosition.setDeviceName(device.getName());
                        }
                        mobilePosition.setDeviceId(device.getDeviceId());
                        mobilePosition.setChannelId(channelId);
                        continue;
                    case "Time":
                        String timeVal = element.getStringValue();
@@ -167,70 +135,65 @@
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
            deviceChannel.setChannelId(channelId);
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(deviceId + channelId, deviceChannel);
                updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
            addMobilePositionList.add(mobilePosition);
            if(updateChannelMap.size() > 2000) {
                executeSaveChannel();
            }
            if (userSetting.isSavePositionHistory()) {
                if(addMobilePositionList.size() > 2000) {
                    executeSaveMobilePosition();
                }
            }
//            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            if (!dynamicTask.contains(talkKey)) {
                dynamicTask.startDelay(talkKey, this::executeSave, 3000);
                // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
                try {
                    eventPublisher.mobilePositionEventPublish(mobilePosition);
                }catch (Exception e) {
                    logger.error("[向上级转发移动位置失败] ", e);
            }
                if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) {
                    List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
                    channels.forEach(channel -> {
                        // 发送redis消息。 通知位置信息的变化
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                        jsonObject.put("serial", channel.getDeviceId());
                        jsonObject.put("code", channel.getChannelId());
                        jsonObject.put("longitude", mobilePosition.getLongitude());
                        jsonObject.put("latitude", mobilePosition.getLatitude());
                        jsonObject.put("altitude", mobilePosition.getAltitude());
                        jsonObject.put("direction", mobilePosition.getDirection());
                        jsonObject.put("speed", mobilePosition.getSpeed());
                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
                    });
                }else {
                    // 发送redis消息。 通知位置信息的变化
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                    jsonObject.put("serial", mobilePosition.getDeviceId());
                    jsonObject.put("code", mobilePosition.getChannelId());
                    jsonObject.put("longitude", mobilePosition.getLongitude());
                    jsonObject.put("latitude", mobilePosition.getLatitude());
                    jsonObject.put("altitude", mobilePosition.getAltitude());
                    jsonObject.put("direction", mobilePosition.getDirection());
                    jsonObject.put("speed", mobilePosition.getSpeed());
                    redisCatchStorage.sendMobilePositionMsg(jsonObject);
                }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    private void executeSave(){
        executeSaveChannel();
        executeSaveMobilePosition();
        dynamicTask.stop(talkKey);
    }
    @Async("taskExecutor")
    public void executeSaveChannel(){
        dynamicTask.execute();
        try {
            logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
        if(!updateChannelMap.isEmpty()) {
            List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
            logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
            deviceChannelService.batchUpdateChannelGPS(channels);
            updateChannelMap.clear();
        }catch (Exception e) {
        }
    }
    @Async("taskExecutor")
    public void executeSaveMobilePosition(){
        if (userSetting.isSavePositionHistory()) {
        if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
            try {
                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
                addMobilePositionList.clear();
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            }
            addMobilePositionList.clear();
        }
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -37,6 +37,7 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -111,12 +112,17 @@
        }
        boolean runed = !taskQueue.isEmpty();
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!runed) {
            taskExecutor.execute(()-> {
//                logger.warn("开始处理");
                while (!taskQueue.isEmpty()) {
    }
    @Scheduled(fixedRate = 200)   //每200毫秒执行一次
    public void executeTaskQueue(){
        if (taskQueue.isEmpty()) {
            return;
        }
                    try {
                        HandlerCatchData take = taskQueue.poll();
            List<RequestEvent> catalogEventList = new ArrayList<>();
            List<RequestEvent> alarmEventList = new ArrayList<>();
            List<RequestEvent> mobilePositionEventList = new ArrayList<>();
            for (HandlerCatchData take : taskQueue) {
                        if (take == null) {
                            continue;
                        }
@@ -128,28 +134,31 @@
                        String cmd = XmlUtil.getText(rootElement, "CmdType");
                        if (CmdType.CATALOG.equals(cmd)) {
                            logger.info("接收到Catalog通知");
                            notifyRequestForCatalogProcessor.process(take.getEvt());
                    catalogEventList.add(take.getEvt());
                        } else if (CmdType.ALARM.equals(cmd)) {
                            logger.info("接收到Alarm通知");
                            processNotifyAlarm(take.getEvt());
                    alarmEventList.add(take.getEvt());
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
//                            logger.info("接收到MobilePosition通知");
//                            processNotifyMobilePosition(take.getEvt());
//                            taskExecutor.execute(() -> {
                                notifyRequestForMobilePositionProcessor.process(take.getEvt());
//                            });
                    mobilePositionEventList.add(take.getEvt());
                        } else {
                            logger.info("接收到消息:" + cmd);
                }
            }
            taskQueue.clear();
            if (!alarmEventList.isEmpty()) {
                processNotifyAlarm(alarmEventList);
            }
            if (!catalogEventList.isEmpty()) {
                notifyRequestForCatalogProcessor.process(catalogEventList);
            }
            if (!mobilePositionEventList.isEmpty()) {
                notifyRequestForMobilePositionProcessor.process(mobilePositionEventList);
                        }
                    } catch (DocumentException e) {
                        logger.error("处理NOTIFY消息时错误", e);
                    }
                }
            });
        }
    }
    /**
     * 处理MobilePosition移动位置Notify
@@ -253,13 +262,13 @@
    /***
     * 处理alarm设备报警Notify
     *
     * @param evt
     */
    private void processNotifyAlarm(RequestEvent evt) {
    private void processNotifyAlarm(List<RequestEvent> evtList) {
        if (!sipConfig.isAlarm()) {
            return;
        }
        if (!evtList.isEmpty()) {
            for (RequestEvent evt : evtList) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@@ -344,6 +353,8 @@
            logger.error("未处理的异常 ", e);
        }
    }
        }
    }
    public void setCmder(SIPCommander cmder) {
    }
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -102,4 +102,9 @@
    void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
    void online(DeviceChannel channel);
    void offline(DeviceChannel channel);
    void delete(DeviceChannel channel);
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -248,8 +249,24 @@
    }
    @Override
    public void online(DeviceChannel channel) {
        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public int channelsOffline(List<DeviceChannel> channels) {
        return channelMapper.batchOffline(channels);
    }
    @Override
    public void offline(DeviceChannel channel) {
        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public void delete(DeviceChannel channel) {
        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
@@ -355,12 +372,45 @@
    }
    @Override
    @Transactional
    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
        channelMapper.batchUpdate(channelList);
        for (DeviceChannel deviceChannel : channelList) {
            deviceChannel.setUpdateTime(DateUtil.getNow());
            if (deviceChannel.getGpsTime() == null) {
                deviceChannel.setGpsTime(DateUtil.getNow());
            }
        }
        int count = 1000;
        if (channelList.size() > count) {
            for (int i = 0; i < channelList.size(); i+=count) {
                int toIndex = i+count;
                if ( i + count > channelList.size()) {
                    toIndex = channelList.size();
                }
                List<DeviceChannel> channels = channelList.subList(i, toIndex);
                channelMapper.batchUpdatePosition(channels);
            }
        }else {
            channelMapper.batchUpdatePosition(channelList);
        }
    }
    @Override
    @Transactional
    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
//        int count = 500;
//        if (mobilePositions.size() > count) {
//            for (int i = 0; i < mobilePositions.size(); i+=count) {
//                int toIndex = i+count;
//                if ( i + count > mobilePositions.size()) {
//                    toIndex = mobilePositions.size();
//                }
//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
//            }
//        }else {
//            deviceMobilePositionMapper.batchadd(mobilePositions);
//        }
        deviceMobilePositionMapper.batchadd(mobilePositions);
    }
}
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,6 +401,23 @@
            " </script>"})
    int updatePosition(DeviceChannel deviceChannel);
    @Update({"<script>" +
            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
            " UPDATE" +
            " wvp_device_channel" +
            " SET gps_time=#{item.gpsTime}" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
            "WHERE device_id=#{item.deviceId} " +
            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
            "</foreach>" +
            "</script>"})
    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
    @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
    List<DeviceChannel> getAllChannelInPlay();
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -46,6 +46,20 @@
            "#{item.createTime}) " +
            "</foreach> " +
            "</script>")
    void batchadd2(List<MobilePosition> mobilePositions);
    @Insert("<script> " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}); " +
            "</foreach> " +
            "</script>")
    void batchadd(List<MobilePosition> mobilePositions);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -570,7 +570,7 @@
    @Override
    public void sendMobilePositionMsg(JSONObject jsonObject) {
        String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
//        logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
        redisTemplate.convertAndSend(key, jsonObject);
    }