648540858
2024-02-20 a9ab5c28e9fd52c1d936a245ac46c9e556f6bc3e
优化订阅机制,需要重新订阅时,取消命令发送后再发送订阅命令 #1273
10个文件已修改
143 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -52,7 +52,7 @@
        Runnable runnable = dynamicTask.get(taskOverdueKey);
        if (runnable instanceof ISubscribeTask) {
            ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
            subscribeTask.stop();
            subscribeTask.stop(null);
        }
        // 添加任务处理订阅过期
        dynamicTask.stop(taskOverdueKey);
@@ -87,7 +87,7 @@
        Runnable runnable = dynamicTask.get(taskOverdueKey);
        if (runnable instanceof ISubscribeTask) {
            ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
            subscribeTask.stop();
            subscribeTask.stop(null);
        }
        // 添加任务处理订阅过期
        dynamicTask.stop(taskOverdueKey);
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,10 +1,10 @@
package com.genersoft.iot.vmp.gb28181.task;
import javax.sip.DialogState;
import com.genersoft.iot.vmp.common.CommonCallback;
/**
 * @author lin
 */
public interface ISubscribeTask extends Runnable{
    void stop();
    void stop(CommonCallback<Boolean> callback);
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
@@ -7,14 +8,13 @@
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sip.*;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
/**
 * 目录订阅任务
@@ -71,7 +71,7 @@
    }
    @Override
    public void stop() {
    public void stop(CommonCallback<Boolean> callback) {
        /**
         * dialog 的各个状态
         * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
@@ -94,6 +94,9 @@
                    // 成功
                    logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
                }
                if (callback != null) {
                    callback.run(event.getResponse().getRawContent() != null);
                }
            },eventResult -> {
                // 失败
                logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,20 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.DialogState;
import java.util.List;
/**
 * 向已经订阅(移动位置)的上级发送MobilePosition消息
@@ -38,7 +27,7 @@
    }
    @Override
    public void stop() {
    public void stop(CommonCallback<Boolean> callback) {
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -1,21 +1,19 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.Timer;
import java.util.TimerTask;
/**
 * 移动位置订阅的定时更新
@@ -70,7 +68,7 @@
    }
    @Override
    public void stop() {
    public void stop(CommonCallback<Boolean> callback) {
        /**
         * dialog 的各个状态
         * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
@@ -92,6 +90,9 @@
                    // 成功
                    logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
                }
                if (callback != null) {
                    callback.run(event.getResponse().getRawContent() != null);
                }
            },eventResult -> {
                // 失败
                logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -82,8 +82,9 @@
            device.setIp(remoteAddressInfo.getIp());
            // 设备地址变化会引起目录订阅任务失效,需要重新添加
            if (device.getSubscribeCycleForCatalog() > 0) {
                deviceService.removeCatalogSubscribe(device);
                deviceService.addCatalogSubscribe(device);
                deviceService.removeCatalogSubscribe(device, result->{
                    deviceService.addCatalogSubscribe(device);
                });
            }
        }
        if (device.getKeepaliveTime() == null) {
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -39,7 +40,7 @@
     * @param device 设备信息
     * @return 布尔
     */
    boolean removeCatalogSubscribe(Device device);
    boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
    /**
     * 添加移动位置订阅
@@ -53,7 +54,7 @@
     * @param device 设备信息
     * @return 布尔
     */
    boolean removeMobilePositionSubscribe(Device device);
    boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
    /**
     * 移除移动位置订阅
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
@@ -231,8 +232,8 @@
            }
        }
        // 移除订阅
        removeCatalogSubscribe(device);
        removeMobilePositionSubscribe(device);
        removeCatalogSubscribe(device, null);
        removeMobilePositionSubscribe(device, null);
    }
    @Override
@@ -251,7 +252,7 @@
    }
    @Override
    public boolean removeCatalogSubscribe(Device device) {
    public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
@@ -261,7 +262,7 @@
            Runnable runnable = dynamicTask.get(taskKey);
            if (runnable instanceof ISubscribeTask) {
                ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
                subscribeTask.stop();
                subscribeTask.stop(callback);
            }
        }
        dynamicTask.stop(taskKey);
@@ -284,7 +285,7 @@
    }
    @Override
    public boolean removeMobilePositionSubscribe(Device device) {
    public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
        if (device == null || device.getSubscribeCycleForCatalog() < 0) {
            return false;
        }
@@ -294,7 +295,7 @@
            Runnable runnable = dynamicTask.get(taskKey);
            if (runnable instanceof ISubscribeTask) {
                ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
                subscribeTask.stop();
                subscribeTask.stop(callback);
            }
        }
        dynamicTask.stop(taskKey);
@@ -522,39 +523,54 @@
        if (!ObjectUtils.isEmpty(device.getStreamMode())) {
            deviceInStore.setStreamMode(device.getStreamMode());
        }
        //  目录订阅相关的信息
        if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
            if (device.getSubscribeCycleForCatalog() > 0) {
                // 若已开启订阅,但订阅周期不同,则先取消
                if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
                    removeCatalogSubscribe(deviceInStore);
                    removeCatalogSubscribe(deviceInStore, result->{
                        // 开启订阅
                        deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                        addCatalogSubscribe(deviceInStore);
                        // 因为是异步执行,需要在这里更新下数据
                        deviceMapper.updateCustom(deviceInStore);
                        redisCatchStorage.updateDevice(deviceInStore);
                    });
                }else {
                    // 开启订阅
                    deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                    addCatalogSubscribe(deviceInStore);
                }
                // 开启订阅
                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                addCatalogSubscribe(deviceInStore);
            }else if (device.getSubscribeCycleForCatalog() == 0) {
                // 取消订阅
                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
                removeCatalogSubscribe(deviceInStore);
                deviceInStore.setSubscribeCycleForCatalog(0);
                removeCatalogSubscribe(deviceInStore, null);
            }
        }
        // 移动位置订阅相关的信息
        if (device.getSubscribeCycleForMobilePosition() > 0) {
            if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                // 开启订阅
                addMobilePositionSubscribe(deviceInStore);
            }
        }else if (device.getSubscribeCycleForMobilePosition() == 0) {
            if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
        if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
            if (device.getSubscribeCycleForMobilePosition() > 0) {
                // 若已开启订阅,但订阅周期不同,则先取消
                if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
                    removeMobilePositionSubscribe(deviceInStore, result->{
                        // 开启订阅
                        deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                        addMobilePositionSubscribe(deviceInStore);
                        // 因为是异步执行,需要在这里更新下数据
                        deviceMapper.updateCustom(deviceInStore);
                        redisCatchStorage.updateDevice(deviceInStore);
                    });
                }else {
                    // 开启订阅
                    deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
                    addMobilePositionSubscribe(deviceInStore);
                }
            }else if (device.getSubscribeCycleForMobilePosition() == 0) {
                // 取消订阅
                removeMobilePositionSubscribe(deviceInStore);
                deviceInStore.setSubscribeCycleForCatalog(0);
                removeCatalogSubscribe(deviceInStore, null);
            }
        }
        if (deviceInStore.getGeoCoordSys() != null) {
@@ -574,9 +590,8 @@
        //作为消息通道
        deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
        
        // 更新redis
        deviceMapper.updateCustom(deviceInStore);
        redisCatchStorage.removeDevice(deviceInStore.getDeviceId());
        redisCatchStorage.updateDevice(deviceInStore);
    }
    @Override
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
@@ -153,10 +153,7 @@
        Device device = storager.queryVideoDevice(deviceId);
        device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires));
        device.setMobilePositionSubmissionInterval(Integer.parseInt(interval));
        deviceService.updateDevice(device);
        if (!deviceService.removeMobilePositionSubscribe(device)) {
            throw new ControllerException(ErrorCode.ERROR100);
        }
        deviceService.updateCustomDevice(device);
    }
    /**
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -199,7 +199,7 @@
                    Runnable runnable = dynamicTask.get(key);
                    if (runnable instanceof ISubscribeTask) {
                        ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
                        subscribeTask.stop();
                        subscribeTask.stop(null);
                    }
                    dynamicTask.stop(key);
                }