src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -52,7 +52,7 @@ Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); @@ -87,7 +87,7 @@ Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.gb28181.task; import javax.sip.DialogState; import com.genersoft.iot.vmp.common.CommonCallback; /** * @author lin */ public interface ISubscribeTask extends Runnable{ void stop(); void stop(CommonCallback<Boolean> callback); } src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.task.impl; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -7,14 +8,13 @@ import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.sip.*; import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; import java.util.Timer; import java.util.TimerTask; /** * 目录订阅任务 @@ -71,7 +71,7 @@ } @Override public void stop() { public void stop(CommonCallback<Boolean> callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -94,6 +94,9 @@ // 成功 logger.info("[取消目录订阅]成功: {}", device.getDeviceId()); } if (callback != null) { callback.run(event.getResponse().getRawContent() != null); } },eventResult -> { // 失败 logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,20 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SpringBeanFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import javax.sip.DialogState; import java.util.List; /** * 向已经订阅(移动位置)的上级发送MobilePosition消息 @@ -38,7 +27,7 @@ } @Override public void stop() { public void stop(CommonCallback<Boolean> callback) { } } src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -1,21 +1,19 @@ package com.genersoft.iot.vmp.gb28181.task.impl; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import javax.sip.*; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; import java.util.Timer; import java.util.TimerTask; /** * 移动位置订阅的定时更新 @@ -70,7 +68,7 @@ } @Override public void stop() { public void stop(CommonCallback<Boolean> callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -92,6 +90,9 @@ // 成功 logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); } if (callback != null) { callback.run(event.getResponse().getRawContent() != null); } },eventResult -> { // 失败 logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -82,8 +82,9 @@ device.setIp(remoteAddressInfo.getIp()); // 设备地址变化会引起目录订阅任务失效,需要重新添加 if (device.getSubscribeCycleForCatalog() > 0) { deviceService.removeCatalogSubscribe(device); deviceService.addCatalogSubscribe(device); deviceService.removeCatalogSubscribe(device, result->{ deviceService.addCatalogSubscribe(device); }); } } if (device.getKeepaliveTime() == null) { src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; @@ -39,7 +40,7 @@ * @param device 设备信息 * @return 布尔 */ boolean removeCatalogSubscribe(Device device); boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback); /** * 添加移动位置订阅 @@ -53,7 +54,7 @@ * @param device 设备信息 * @return 布尔 */ boolean removeMobilePositionSubscribe(Device device); boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback); /** * 移除移动位置订阅 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -231,8 +232,8 @@ } } // 移除订阅 removeCatalogSubscribe(device); removeMobilePositionSubscribe(device); removeCatalogSubscribe(device, null); removeMobilePositionSubscribe(device, null); } @Override @@ -251,7 +252,7 @@ } @Override public boolean removeCatalogSubscribe(Device device) { public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -261,7 +262,7 @@ Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -284,7 +285,7 @@ } @Override public boolean removeMobilePositionSubscribe(Device device) { public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -294,7 +295,7 @@ Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -522,39 +523,54 @@ if (!ObjectUtils.isEmpty(device.getStreamMode())) { deviceInStore.setStreamMode(device.getStreamMode()); } // 目录订阅相关的信息 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { if (device.getSubscribeCycleForCatalog() > 0) { // 若已开启订阅,但订阅周期不同,则先取消 if (deviceInStore.getSubscribeCycleForCatalog() != 0) { removeCatalogSubscribe(deviceInStore); removeCatalogSubscribe(deviceInStore, result->{ // 开启订阅 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); addCatalogSubscribe(deviceInStore); // 因为是异步执行,需要在这里更新下数据 deviceMapper.updateCustom(deviceInStore); redisCatchStorage.updateDevice(deviceInStore); }); }else { // 开启订阅 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); addCatalogSubscribe(deviceInStore); } // 开启订阅 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); addCatalogSubscribe(deviceInStore); }else if (device.getSubscribeCycleForCatalog() == 0) { // 取消订阅 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); removeCatalogSubscribe(deviceInStore); deviceInStore.setSubscribeCycleForCatalog(0); removeCatalogSubscribe(deviceInStore, null); } } // 移动位置订阅相关的信息 if (device.getSubscribeCycleForMobilePosition() > 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); // 开启订阅 addMobilePositionSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForMobilePosition() == 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { if (device.getSubscribeCycleForMobilePosition() > 0) { // 若已开启订阅,但订阅周期不同,则先取消 if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { removeMobilePositionSubscribe(deviceInStore, result->{ // 开启订阅 deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); addMobilePositionSubscribe(deviceInStore); // 因为是异步执行,需要在这里更新下数据 deviceMapper.updateCustom(deviceInStore); redisCatchStorage.updateDevice(deviceInStore); }); }else { // 开启订阅 deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); addMobilePositionSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 取消订阅 removeMobilePositionSubscribe(deviceInStore); deviceInStore.setSubscribeCycleForCatalog(0); removeCatalogSubscribe(deviceInStore, null); } } if (deviceInStore.getGeoCoordSys() != null) { @@ -574,9 +590,8 @@ //作为消息通道 deviceInStore.setAsMessageChannel(device.isAsMessageChannel()); // 更新redis deviceMapper.updateCustom(deviceInStore); redisCatchStorage.removeDevice(deviceInStore.getDeviceId()); redisCatchStorage.updateDevice(deviceInStore); } @Override src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
@@ -153,10 +153,7 @@ Device device = storager.queryVideoDevice(deviceId); device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires)); device.setMobilePositionSubmissionInterval(Integer.parseInt(interval)); deviceService.updateDevice(device); if (!deviceService.removeMobilePositionSubscribe(device)) { throw new ControllerException(ErrorCode.ERROR100); } deviceService.updateCustomDevice(device); } /** src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -199,7 +199,7 @@ Runnable runnable = dynamicTask.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; subscribeTask.stop(); subscribeTask.stop(null); } dynamicTask.stop(key); }