648540858
2024-05-13 588b1da35a1a51ddfca76fb8ca9c2c0c6cd70038
Merge branch 'refs/heads/2.7.0' into 271-优化notify存储

# Conflicts:
# src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
# src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
# src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
16个文件已修改
1064 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 389 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java 294 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java 205 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -66,7 +66,7 @@
    private List<String> allowedOrigins = new ArrayList<>();
    private int maxNotifyCountQueue = 10000;
    private int maxNotifyCountQueue = 100000;
    private int registerAgainAfterTime = 60;
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java
@@ -535,11 +535,11 @@
        this.subCount = subCount;
    }
    public boolean isHasAudio() {
    public Boolean getHasAudio() {
        return hasAudio;
    }
    public void setHasAudio(boolean hasAudio) {
    public void setHasAudio(Boolean hasAudio) {
        this.hasAudio = hasAudio;
    }
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -49,6 +49,7 @@
        ParentPlatform parentPlatform = null;
        Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
        Map<String, DeviceChannel> channelMap = new HashMap<>();
        if (!ObjectUtils.isEmpty(event.getPlatformId())) {
            subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
            if (subscribe == null) {
@@ -67,6 +68,7 @@
                    for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
                        List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
                        parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
                        channelMap.put(deviceChannel.getChannelId(), deviceChannel);
                    }
                }
            }else if (event.getGbStreams() != null) {
@@ -174,7 +176,7 @@
                                }
                                logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
                                List<DeviceChannel> deviceChannelList = new ArrayList<>();
                                DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
                                DeviceChannel deviceChannel = channelMap.get(gbId);
                                deviceChannelList.add(deviceChannel);
                                GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
                                if(gbStream != null){
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -520,9 +520,7 @@
        if (parentPlatform == null) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
        }
        logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
        String characterSet = parentPlatform.getCharacterSet();
        StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -597,6 +595,7 @@
        Integer finalIndex = index;
        String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
                deviceChannels.size(), type, subscribeInfo);
        System.out.println(catalogXmlContent);
        logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
        sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
            logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
@@ -626,7 +625,6 @@
    private  String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
        StringBuffer catalogXml = new StringBuffer(600);
        String characterSet = parentPlatform.getCharacterSet();
        catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n")
                .append("<Notify>\r\n")
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -6,7 +6,6 @@
import com.google.common.primitives.Bytes;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ArrayUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -172,6 +171,7 @@
        return getRootElement(evt, "gb2312");
    }
    public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
        if (charset == null) {
            charset = "gb2312";
        }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,11 +1,11 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -19,7 +19,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
@@ -28,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -46,6 +49,7 @@
    private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
    private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
@@ -63,222 +67,193 @@
    private DynamicTask dynamicTask;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    @Autowired
    private SipConfig sipConfig;
    private final static String talkKey = "notify-request-for-catalog-task";
    @Transactional
    public void process(RequestEvent evt) {
        try {
            long start = System.currentTimeMillis();
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
        if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
        }
        taskQueue.offer(new HandlerCatchData(evt, null, null));
    }
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null || !device.isOnLine()) {
                logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
                return;
    @Scheduled(fixedRate = 400)   //每400毫秒执行一次
    public void executeTaskQueue(){
        if (taskQueue.isEmpty()) {
            return;
        }
        for (HandlerCatchData take : taskQueue) {
            if (take == null) {
                continue;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
                logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
                return;
            }
            Element deviceListElement = rootElement.element("DeviceList");
            if (deviceListElement == null) {
                return;
            }
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
            RequestEvent evt = take.getEvt();
            try {
                long start = System.currentTimeMillis();
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
                // 遍历DeviceList
                while (deviceListIterator.hasNext()) {
                    Element itemDevice = deviceListIterator.next();
                    Element channelDeviceElement = itemDevice.element("DeviceID");
                    if (channelDeviceElement == null) {
                        continue;
                    }
                    Element eventElement = itemDevice.element("Event");
                    String event;
                    if (eventElement == null) {
                        logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
                        event = CatalogEvent.ADD;
                    }else {
                        event = eventElement.getText().toUpperCase();
                    }
                    DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
                    if (channel == null) {
                        logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                        continue;
                    }
                    if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                        channel.setParentId(null);
                    }
                    channel.setDeviceId(device.getDeviceId());
                    logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                    switch (event) {
                        case CatalogEvent.ON:
                            // 上线
                            logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            updateChannelOnlineList.add(channel);
                            if (updateChannelOnlineList.size() > 300) {
                                executeSaveForOnline();
                            }
                            if (userSetting.getDeviceStatusNotify()) {
                                // 发送redis消息
                                redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                            }
                Device device = redisCatchStorage.getDevice(deviceId);
                if (device == null || !device.isOnLine()) {
                    logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : ""));
                    return;
                }
                Element rootElement = getRootElement(evt, device.getCharset());
                if (rootElement == null) {
                    logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
                    return;
                }
                Element deviceListElement = rootElement.element("DeviceList");
                if (deviceListElement == null) {
                    return;
                }
                Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                if (deviceListIterator != null) {
                            break;
                        case CatalogEvent.OFF :
                            // 离线
                            logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.VLOST:
                            // 视频丢失
                            logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.DEFECT:
                            // 故障
                            logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            }else {
                                updateChannelOfflineList.add(channel);
                                if (updateChannelOfflineList.size() > 300) {
                                    executeSaveForOffline();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                }
                            }
                            break;
                        case CatalogEvent.ADD:
                            // 增加
                            logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            // 判断此通道是否存在
                            DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannel != null) {
                                logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                channel.setId(deviceChannel.getId());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                            }
                            break;
                        case CatalogEvent.DEL:
                            // 删除
                            logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            deleteChannelList.add(channel);
                            if (userSetting.getDeviceStatusNotify()) {
                                // 发送redis消息
                                redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                            }
                            if (deleteChannelList.size() > 300) {
                                executeSaveForDelete();
                            }
                            break;
                        case CatalogEvent.UPDATE:
                            // 更新
                            logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                            // 判断此通道是否存在
                            DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                            if (deviceChannelForUpdate != null) {
                                channel.setId(deviceChannelForUpdate.getId());
                                channel.setUpdateTime(DateUtil.getNow());
                                updateChannelMap.put(channel.getChannelId(), channel);
                                if (updateChannelMap.keySet().size() > 300) {
                                    executeSaveForUpdate();
                                }
                            }else {
                                addChannelMap.put(channel.getChannelId(), channel);
                                if (addChannelMap.keySet().size() > 300) {
                                    executeSaveForAdd();
                                }
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                }
                            }
                            break;
                        default:
                            logger.warn("[ NotifyCatalog ] event not found : {}", event );
                    }
                    // 转发变化信息
                    eventPublisher.catalogEventPublish(null, channel, event);
                    if (!updateChannelMap.keySet().isEmpty()
                            || !addChannelMap.keySet().isEmpty()
                            || !updateChannelOnlineList.isEmpty()
                            || !updateChannelOfflineList.isEmpty()
                            || !deleteChannelList.isEmpty()) {
                        if (!dynamicTask.contains(talkKey)) {
                            dynamicTask.startDelay(talkKey, this::executeSave, 1000);
                    // 遍历DeviceList
                    while (deviceListIterator.hasNext()) {
                        Element itemDevice = deviceListIterator.next();
                        Element eventElement = itemDevice.element("Event");
                        String event;
                        if (eventElement == null) {
                            logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : ""));
                            event = CatalogEvent.ADD;
                        } else {
                            event = eventElement.getText().toUpperCase();
                        }
                        DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
                        if (channel == null) {
                            logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                            continue;
                        }
                        if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                            channel.setParentId(null);
                        }
                        channel.setDeviceId(device.getDeviceId());
                        logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                        switch (event) {
                            case CatalogEvent.ON:
                                // 上线
                                logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                updateChannelOnlineList.add(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                break;
                            case CatalogEvent.OFF:
                                // 离线
                                logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                } else {
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.VLOST:
                                // 视频丢失
                                logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                } else {
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.DEFECT:
                                // 故障
                                logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                } else {
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.ADD:
                                // 增加
                                logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                // 判断此通道是否存在
                                DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                                if (deviceChannel != null) {
                                    logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                    channel.setId(deviceChannel.getId());
                                    channel.setHasAudio(null);
                                    updateChannelMap.put(channel.getChannelId(), channel);
                                } else {
                                    addChannelMap.put(channel.getChannelId(), channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                    }
                                }
                                break;
                            case CatalogEvent.DEL:
                                // 删除
                                logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                deleteChannelList.add(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                                }
                                break;
                            case CatalogEvent.UPDATE:
                                // 更新
                                logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                // 判断此通道是否存在
                                DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                                if (deviceChannelForUpdate != null) {
                                    channel.setId(deviceChannelForUpdate.getId());
                                    channel.setUpdateTime(DateUtil.getNow());
                                    channel.setHasAudio(null);
                                    updateChannelMap.put(channel.getChannelId(), channel);
                                } else {
                                    addChannelMap.put(channel.getChannelId(), channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                    }
                                }
                                break;
                            default:
                                logger.warn("[ NotifyCatalog ] event not found : {}", event);
                        }
                        // 转发变化信息
                        eventPublisher.catalogEventPublish(null, channel, event);
                    }
                }
            } catch (DocumentException e) {
                logger.error("未处理的异常 ", e);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
        taskQueue.clear();
        if (!updateChannelMap.keySet().isEmpty()
                || !addChannelMap.keySet().isEmpty()
                || !updateChannelOnlineList.isEmpty()
                || !updateChannelOfflineList.isEmpty()
                || !deleteChannelList.isEmpty()) {
            executeSave();
        }
    }
    private void executeSave(){
    public void executeSave(){
        try {
            executeSaveForAdd();
        } catch (Exception e) {
            logger.error("[存储收到的增加通道] 异常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
        try {
            executeSaveForOnline();
@@ -290,7 +265,16 @@
        } catch (Exception e) {
            logger.error("[存储收到的通道离线] 异常: ", e );
        }
        dynamicTask.stop(talkKey);
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
    }
    private void executeSaveForUpdate(){
@@ -300,7 +284,6 @@
            deviceChannelService.batchUpdateChannel(deviceChannels);
            updateChannelMap.clear();
        }
    }
    private void executeSaveForAdd(){
@@ -332,4 +315,8 @@
        }
    }
    @Scheduled(fixedRate = 10000)   //每1秒执行一次
    public void execute(){
        logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java
@@ -1,17 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -20,7 +18,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
@@ -29,7 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求中的移动位置请求处理
@@ -40,10 +40,7 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class);
    private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
    private final List<MobilePosition> addMobilePositionList = new CopyOnWriteArrayList();
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
@@ -57,158 +54,169 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private CivilCodeFileConf civilCodeFileConf;
    @Autowired
    private SipConfig sipConfig;
    private final static String talkKey = "notify-request-for-mobile-position-task";
    public void process(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            // 回复 200 OK
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
                return;
        if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
            logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
            return;
        }
        taskQueue.offer(new HandlerCatchData(evt, null, null));
    }
    @Scheduled(fixedRate = 200) //每200毫秒执行一次
    @Transactional
    public void executeTaskQueue() {
        if (taskQueue.isEmpty()) {
            return;
        }
        Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
        List<MobilePosition> addMobilePositionList = new ArrayList<>();
        for (HandlerCatchData take : taskQueue) {
            if (take == null) {
                continue;
            }
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setCreateTime(DateUtil.getNow());
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getTextTrim().toString();
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                device = redisCatchStorage.getDevice(channelId);
            RequestEvent evt = take.getEvt();
            try {
                FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
                String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
                long startTime = System.currentTimeMillis();
                // 回复 200 OK
                Element rootElement = getRootElement(evt);
                if (rootElement == null) {
                    logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
                    return;
                }
                Device device = redisCatchStorage.getDevice(deviceId);
                if (device == null) {
                    // 根据通道id查询设备Id
                    List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                    if (deviceList.size() > 0) {
                        device = deviceList.get(0);
                    logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
                    return;
                }
                MobilePosition mobilePosition = new MobilePosition();
                mobilePosition.setDeviceId(device.getDeviceId());
                mobilePosition.setDeviceName(device.getName());
                mobilePosition.setCreateTime(DateUtil.getNow());
                List<Element> elements = rootElement.elements();
                for (Element element : elements) {
                    switch (element.getName()){
                        case "DeviceID":
                            String channelId = element.getStringValue();
                            if (!deviceId.equals(channelId)) {
                                mobilePosition.setChannelId(channelId);
                            }
                            continue;
                        case "Time":
                            String timeVal = element.getStringValue();
                            if (ObjectUtils.isEmpty(timeVal)) {
                                mobilePosition.setTime(DateUtil.getNow());
                            } else {
                                mobilePosition.setTime(SipUtils.parseTime(timeVal));
                            }
                            continue;
                        case "Longitude":
                            mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
                            continue;
                        case "Latitude":
                            mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
                            continue;
                        case "Speed":
                            String speedVal = element.getStringValue();
                            if (NumericUtil.isDouble(speedVal)) {
                                mobilePosition.setSpeed(Double.parseDouble(speedVal));
                            } else {
                                mobilePosition.setSpeed(0.0);
                            }
                            continue;
                        case "Direction":
                            String directionVal = element.getStringValue();
                            if (NumericUtil.isDouble(directionVal)) {
                                mobilePosition.setDirection(Double.parseDouble(directionVal));
                            } else {
                                mobilePosition.setDirection(0.0);
                            }
                            continue;
                        case "Altitude":
                            String altitudeVal = element.getStringValue();
                            if (NumericUtil.isDouble(altitudeVal)) {
                                mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
                            } else {
                                mobilePosition.setAltitude(0.0);
                            }
                            continue;
                    }
                }
            }
            if (device == null) {
                logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                return;
            }
            // 兼容设备部分设备上报是通道编号与设备编号一致的情况
            if (deviceId.equals(channelId)) {
                List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                if (deviceChannels.size() == 1) {
                    channelId = deviceChannels.get(0).getChannelId();
//            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
//                    mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
                mobilePosition.setReportSource("Mobile Position");
                // 更新device channel 的经纬度
                DeviceChannel deviceChannel = new DeviceChannel();
                deviceChannel.setDeviceId(device.getDeviceId());
                deviceChannel.setLongitude(mobilePosition.getLongitude());
                deviceChannel.setLatitude(mobilePosition.getLatitude());
                deviceChannel.setGpsTime(mobilePosition.getTime());
                updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
                addMobilePositionList.add(mobilePosition);
                // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
                try {
                    eventPublisher.mobilePositionEventPublish(mobilePosition);
                }catch (Exception e) {
                    logger.error("[向上级转发移动位置失败] ", e);
                }
            }
            if (!ObjectUtils.isEmpty(device.getName())) {
                mobilePosition.setDeviceName(device.getName());
            }
            mobilePosition.setDeviceName(device.getName());
            mobilePosition.setDeviceId(device.getDeviceId());
            mobilePosition.setChannelId(channelId);
            String time = XmlUtil.getText(rootElement, "Time");
            if (ObjectUtils.isEmpty(time)) {
                mobilePosition.setTime(DateUtil.getNow());
            } else {
                mobilePosition.setTime(SipUtils.parseTime(time));
            }
            mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
                mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
            } else {
                mobilePosition.setSpeed(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
                mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
            } else {
                mobilePosition.setDirection(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
                mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
            deviceChannel.setChannelId(channelId);
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
            updateChannelMap.put(channelId, deviceChannel);
            addMobilePositionList.add(mobilePosition);
            if(updateChannelMap.size() > 300) {
                executeSaveChannel();
            }
            if (userSetting.isSavePositionHistory()) {
                if(addMobilePositionList.size() > 300) {
                    executeSaveMobilePosition();
                if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) {
                    List<DeviceChannel> channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId());
                    channels.forEach(channel -> {
                        // 发送redis消息。 通知位置信息的变化
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                        jsonObject.put("serial", channel.getDeviceId());
                        jsonObject.put("code", channel.getChannelId());
                        jsonObject.put("longitude", mobilePosition.getLongitude());
                        jsonObject.put("latitude", mobilePosition.getLatitude());
                        jsonObject.put("altitude", mobilePosition.getAltitude());
                        jsonObject.put("direction", mobilePosition.getDirection());
                        jsonObject.put("speed", mobilePosition.getSpeed());
                        redisCatchStorage.sendMobilePositionMsg(jsonObject);
                    });
                }else {
                    // 发送redis消息。 通知位置信息的变化
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
                    jsonObject.put("serial", mobilePosition.getDeviceId());
                    jsonObject.put("code", mobilePosition.getChannelId());
                    jsonObject.put("longitude", mobilePosition.getLongitude());
                    jsonObject.put("latitude", mobilePosition.getLatitude());
                    jsonObject.put("altitude", mobilePosition.getAltitude());
                    jsonObject.put("direction", mobilePosition.getDirection());
                    jsonObject.put("speed", mobilePosition.getSpeed());
                    redisCatchStorage.sendMobilePositionMsg(jsonObject);
                }
            } catch (DocumentException e) {
                logger.error("未处理的异常 ", e);
            }
//            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
//            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
            if (!dynamicTask.contains(talkKey)) {
                dynamicTask.startDelay(talkKey, this::executeSave, 1000);
            }
        } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
        }
    }
    private void executeSave(){
        executeSaveChannel();
        executeSaveMobilePosition();
        dynamicTask.stop(talkKey);
    }
    private void executeSaveChannel(){
        try {
            logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size());
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannelGPS(deviceChannels);
        taskQueue.clear();
        if(!updateChannelMap.isEmpty()) {
            List<DeviceChannel>  channels = new ArrayList<>(updateChannelMap.values());
            logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
            deviceChannelService.batchUpdateChannel(channels);
            updateChannelMap.clear();
        }catch (Exception e) {
        }
    }
    private void executeSaveMobilePosition(){
        if (userSetting.isSavePositionHistory()) {
        if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
            try {
                logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
                deviceChannelService.batchAddMobilePosition(addMobilePositionList);
                addMobilePositionList.clear();
            }catch (Exception e) {
                logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size());
            }
            addMobilePositionList.clear();
        }
    }
    @Scheduled(fixedRate = 10000)
    public void execute(){
        logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -1,12 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
@@ -14,9 +11,7 @@
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -24,10 +19,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -35,8 +27,6 @@
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的
@@ -46,15 +36,6 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IVideoManagerStorage storager;
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private SipConfig sipConfig;
@@ -76,13 +57,8 @@
    @Autowired
    private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
    private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private int maxQueueCount = 30000;
    private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -93,159 +69,33 @@
    @Override
    public void process(RequestEvent evt) {
        try {
            if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
                responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
                logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
                return;
            }else {
                responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
            }
        }catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("未处理的异常 ", e);
        }
        boolean runed = !taskQueue.isEmpty();
        logger.info("[notify] 待处理消息数量: {}", taskQueue.size());
        taskQueue.offer(new HandlerCatchData(evt, null, null));
        if (!runed) {
            taskExecutor.execute(()-> {
                while (!taskQueue.isEmpty()) {
                    try {
                        HandlerCatchData take = taskQueue.poll();
                        if (take == null) {
                            continue;
                        }
                        Element rootElement = getRootElement(take.getEvt());
                        if (rootElement == null) {
                            logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
                            continue;
                        }
                        String cmd = XmlUtil.getText(rootElement, "CmdType");
                        if (CmdType.CATALOG.equals(cmd)) {
                            logger.info("接收到Catalog通知");
                            notifyRequestForCatalogProcessor.process(take.getEvt());
                        } else if (CmdType.ALARM.equals(cmd)) {
                            logger.info("接收到Alarm通知");
                            processNotifyAlarm(take.getEvt());
                        } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                            logger.info("接收到MobilePosition通知");
                            processNotifyMobilePosition(take.getEvt());
                        } else {
                            logger.info("接收到消息:" + cmd);
                        }
                    } catch (DocumentException e) {
                        logger.error("处理NOTIFY消息时错误", e);
                    }
                }
            });
        }
    }
    /**
     * 处理MobilePosition移动位置Notify
     *
     * @param evt
     */
    private void processNotifyMobilePosition(RequestEvent evt) {
        try {
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            // 回复 200 OK
            responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
            Element rootElement = getRootElement(evt);
            if (rootElement == null) {
                logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
                logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest());
                responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
                return;
            }
            String cmd = XmlUtil.getText(rootElement, "CmdType");
            MobilePosition mobilePosition = new MobilePosition();
            mobilePosition.setCreateTime(DateUtil.getNow());
            Element deviceIdElement = rootElement.element("DeviceID");
            String channelId = deviceIdElement.getTextTrim().toString();
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null) {
                device = redisCatchStorage.getDevice(channelId);
                if (device == null) {
                    // 根据通道id查询设备Id
                    List<Device> deviceList = deviceChannelService.getDeviceByChannelId(channelId);
                    if (deviceList.size() > 0) {
                        device = deviceList.get(0);
                    }
                }
            }
            if (device == null) {
                logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId);
                return;
            }
            // 兼容设备部分设备上报是通道编号与设备编号一致的情况
            if(deviceId.equals(channelId)) {
                List<DeviceChannel> deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId);
                if (deviceChannels.size() == 1) {
                    channelId = deviceChannels.get(0).getChannelId();
                }
            }
            if (!ObjectUtils.isEmpty(device.getName())) {
                mobilePosition.setDeviceName(device.getName());
            }
            mobilePosition.setDeviceId(device.getDeviceId());
            mobilePosition.setChannelId(channelId);
            String time = XmlUtil.getText(rootElement, "Time");
            if (ObjectUtils.isEmpty(time)){
                mobilePosition.setTime(DateUtil.getNow());
            }else {
                mobilePosition.setTime(SipUtils.parseTime(time));
            }
            mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
            mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
                mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
            if (CmdType.CATALOG.equals(cmd)) {
                notifyRequestForCatalogProcessor.process(evt);
            } else if (CmdType.ALARM.equals(cmd)) {
                processNotifyAlarm(evt);
            } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
                notifyRequestForMobilePositionProcessor.process(evt);
            } else {
                mobilePosition.setSpeed(0.0);
                logger.info("接收到消息:" + cmd);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
                mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
            } else {
                mobilePosition.setDirection(0.0);
            }
            if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
                mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
            } else {
                mobilePosition.setAltitude(0.0);
            }
            logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
                    mobilePosition.getLongitude(), mobilePosition.getLatitude());
            mobilePosition.setReportSource("Mobile Position");
            // 更新device channel 的经纬度
            DeviceChannel deviceChannel = new DeviceChannel();
            deviceChannel.setDeviceId(device.getDeviceId());
            deviceChannel.setChannelId(channelId);
            deviceChannel.setLongitude(mobilePosition.getLongitude());
            deviceChannel.setLatitude(mobilePosition.getLatitude());
            deviceChannel.setGpsTime(mobilePosition.getTime());
//            deviceChannel = deviceChannelService.updateGps(deviceChannel, device);
//
//            mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
//            mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
//            mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
//            mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
            deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
        } catch (DocumentException  e) {
        } catch (SipException | InvalidArgumentException | ParseException e) {
            logger.error("未处理的异常 ", e);
        } catch (DocumentException e) {
            throw new RuntimeException(e);
        }
    }
    }
    /***
     * 处理alarm设备报警Notify
     *
     * @param evt
     */
    private void processNotifyAlarm(RequestEvent evt) {
        if (!sipConfig.isAlarm()) {
@@ -336,28 +186,5 @@
        }
    }
    public void setCmder(SIPCommander cmder) {
    }
    public void setStorager(IVideoManagerStorage storager) {
        this.storager = storager;
    }
    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }
    public void setRedis(RedisUtil redis) {
    }
    public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
    }
    public IRedisCatchStorage getRedisCatchStorage() {
        return redisCatchStorage;
    }
    public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
        this.redisCatchStorage = redisCatchStorage;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -289,7 +289,7 @@
                String channelId = ssrcTransactionForAll.get(0).getChannelId();
                DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
                if (deviceChannel != null) {
                    result.setEnable_audio(deviceChannel.isHasAudio());
                    result.setEnable_audio(deviceChannel.getHasAudio());
                }
                // 如果是录像下载就设置视频间隔十秒
                if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -102,4 +102,9 @@
    void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
    void online(DeviceChannel channel);
    void offline(DeviceChannel channel);
    void delete(DeviceChannel channel);
}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -248,8 +249,24 @@
    }
    @Override
    public void online(DeviceChannel channel) {
        channelMapper.online(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public int channelsOffline(List<DeviceChannel> channels) {
        return channelMapper.batchOffline(channels);
    }
    @Override
    public void offline(DeviceChannel channel) {
        channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
    public void delete(DeviceChannel channel) {
        channelMapper.del(channel.getDeviceId(), channel.getChannelId());
    }
    @Override
@@ -258,15 +275,23 @@
    }
    @Override
    public void batchUpdateChannel(List<DeviceChannel> channels) {
    public synchronized void batchUpdateChannel(List<DeviceChannel> channels) {
        String now = DateUtil.getNow();
        for (DeviceChannel channel : channels) {
            channel.setUpdateTime(now);
        }
        channelMapper.batchUpdate(channels);
        for (DeviceChannel channel : channels) {
            if (channel.getParentId() != null) {
                channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
        int limitCount = 1000;
        if (!channels.isEmpty()) {
            if (channels.size() > limitCount) {
                for (int i = 0; i < channels.size(); i += limitCount) {
                    int toIndex = i + limitCount;
                    if (i + limitCount > channels.size()) {
                        toIndex = channels.size();
                    }
                    channelMapper.batchUpdate(channels.subList(i, toIndex));
                }
            }else {
                channelMapper.batchUpdate(channels);
            }
        }
    }
@@ -355,15 +380,45 @@
    }
    @Override
    @Transactional
    public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
        channelMapper.batchUpdatePosition(channelList);
        for (DeviceChannel deviceChannel : channelList) {
            deviceChannel.setUpdateTime(DateUtil.getNow());
            if (deviceChannel.getGpsTime() == null) {
                deviceChannel.setGpsTime(DateUtil.getNow());
            }
        }
        int count = 1000;
        if (channelList.size() > count) {
            for (int i = 0; i < channelList.size(); i+=count) {
                int toIndex = i+count;
                if ( i + count > channelList.size()) {
                    toIndex = channelList.size();
                }
                List<DeviceChannel> channels = channelList.subList(i, toIndex);
                channelMapper.batchUpdatePosition(channels);
            }
        }else {
            channelMapper.batchUpdatePosition(channelList);
        }
    }
    @Override
    @Transactional
    public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
        deviceMobilePositionMapper.batchInsert(mobilePositions);
//        int count = 500;
//        if (mobilePositions.size() > count) {
//            for (int i = 0; i < mobilePositions.size(); i+=count) {
//                int toIndex = i+count;
//                if ( i + count > mobilePositions.size()) {
//                    toIndex = mobilePositions.size();
//                }
//                List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
//                deviceMobilePositionMapper.batchadd(mobilePositionsSub);
//            }
//        }else {
//            deviceMobilePositionMapper.batchadd(mobilePositions);
//        }
        deviceMobilePositionMapper.batchadd(mobilePositions);
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -149,7 +149,6 @@
            logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -163,6 +162,8 @@
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // 释放生成的ssrc,使用上一次申请的
                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -170,15 +170,19 @@
            callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
            return;
        }
        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                    mediaInfo, param.getApp(), param.getStream(), null, null);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
        });
        if (param.isEnable()) {
            String talkKey = UUID.randomUUID().toString();
            String delayTalkKey = UUID.randomUUID().toString();
            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
                dynamicTask.stop(delayTalkKey);
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                        mediaInfo, param.getApp(), param.getStream(), null, null);
                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
            });
            dynamicTask.startDelay(delayTalkKey, ()->{
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
                if (streamInfo != null) {
@@ -324,6 +328,9 @@
            zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        }
        if ("ffmpeg".equalsIgnoreCase(param.getType())){
            if (param.getTimeoutMs() == 0) {
                param.setTimeoutMs(15);
            }
            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                    param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
                    param.getFfmpegCmdKey());
@@ -379,6 +386,7 @@
            gbStreamMapper.del(app, stream);
            videoManagerStorager.deleteStreamProxy(app, stream);
            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PUSH", app, stream);
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -401,6 +401,23 @@
            " </script>"})
    int updatePosition(DeviceChannel deviceChannel);
    @Update({"<script>" +
            "<foreach collection='deviceChannelList' item='item' separator=';'>" +
            " UPDATE" +
            " wvp_device_channel" +
            " SET gps_time=#{item.gpsTime}" +
            "<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
            "<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
            "<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
            "<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
            "<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
            "<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
            "WHERE device_id=#{item.deviceId} " +
            " <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
            "</foreach>" +
            "</script>"})
    int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
    @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
    List<DeviceChannel> getAllChannelInPlay();
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java
@@ -13,8 +13,7 @@
public interface DeviceMobilePositionMapper {
    @Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, " +
            "#{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})")
            "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})")
    int insertNewPosition(MobilePosition mobilePosition);
    @Select(value = {" <script>" +
@@ -34,19 +33,33 @@
    @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
    int clearMobilePositionsByDeviceId(String deviceId);
    @Insert("<script> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude," +
            "latitude,altitude,speed,direction," +
            "report_source,longitude_gcj02,latitude_gcj02," +
            "longitude_wgs84,latitude_wgs84,create_time)"+
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, " +
            "#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.createTime}) " +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}) " +
            "</foreach> " +
            "</script>")
    void batchInsert(List<MobilePosition> mobilePositions);
    void batchadd2(List<MobilePosition> mobilePositions);
    @Insert("<script> " +
            "<foreach collection='mobilePositions' index='index' item='item' separator=','> " +
            "insert into wvp_device_mobile_position " +
            "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
            "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
            "values " +
            "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
            "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
            "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
            "#{item.createTime}); " +
            "</foreach> " +
            "</script>")
    void batchadd(List<MobilePosition> mobilePositions);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -7,7 +7,6 @@
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -139,7 +138,7 @@
            gbIdSet.add(deviceChannel.getChannelId());
            if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
                deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio());
                if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
                    List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
                    if (!CollectionUtils.isEmpty(strings)){
@@ -275,7 +274,7 @@
                    deviceChannel.setUpdateTime(DateUtil.getNow());
                    if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                        deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                        deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
                        deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio());
                        if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
                            List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
                            if (!CollectionUtils.isEmpty(strings)){