From a9ab5c28e9fd52c1d936a245ac46c9e556f6bc3e Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 20 二月 2024 11:12:42 +0800
Subject: [PATCH] 优化订阅机制,需要重新订阅时,取消命令发送后再发送订阅命令 #1273
---
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java | 5 +
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java | 15 +++--
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 73 ++++++++++++++---------
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java | 15 ++--
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java | 4
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java | 2
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java | 15 ----
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java | 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java | 5 +
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java | 5 -
10 files changed, 75 insertions(+), 68 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
index d932a20..e7b7ab8 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -52,7 +52,7 @@
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
- subscribeTask.stop();
+ subscribeTask.stop(null);
}
// 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡
dynamicTask.stop(taskOverdueKey);
@@ -87,7 +87,7 @@
Runnable runnable = dynamicTask.get(taskOverdueKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
- subscribeTask.stop();
+ subscribeTask.stop(null);
}
// 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡
dynamicTask.stop(taskOverdueKey);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
index a4e711d..8d1c7d2 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,10 +1,10 @@
package com.genersoft.iot.vmp.gb28181.task;
-import javax.sip.DialogState;
+import com.genersoft.iot.vmp.common.CommonCallback;
/**
* @author lin
*/
public interface ISubscribeTask extends Runnable{
- void stop();
+ void stop(CommonCallback<Boolean> callback);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
index 2ffbfe4..d9270bb 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
+import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
@@ -7,14 +8,13 @@
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import javax.sip.*;
+import javax.sip.DialogState;
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
-import java.util.Timer;
-import java.util.TimerTask;
/**
* 鐩綍璁㈤槄浠诲姟
@@ -71,7 +71,7 @@
}
@Override
- public void stop() {
+ public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 鐨勫悇涓姸鎬�
* EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭�
@@ -94,6 +94,9 @@
// 鎴愬姛
logger.info("[鍙栨秷鐩綍璁㈤槄]鎴愬姛锛� {}", device.getDeviceId());
}
+ if (callback != null) {
+ callback.run(event.getResponse().getRawContent() != null);
+ }
},eventResult -> {
// 澶辫触
logger.warn("[鍙栨秷鐩綍璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
index 2e792c1..a4512f3 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,20 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
-
-import javax.sip.DialogState;
-import java.util.List;
/**
* 鍚戝凡缁忚闃�(绉诲姩浣嶇疆)鐨勪笂绾у彂閫丮obilePosition娑堟伅
@@ -38,7 +27,7 @@
}
@Override
- public void stop() {
+ public void stop(CommonCallback<Boolean> callback) {
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
index 0abd3ca..9fed079 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -1,21 +1,19 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
+import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.message.SIPResponse;
-import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
import javax.sip.header.ToHeader;
import java.text.ParseException;
-import java.util.Timer;
-import java.util.TimerTask;
/**
* 绉诲姩浣嶇疆璁㈤槄鐨勫畾鏃舵洿鏂�
@@ -70,7 +68,7 @@
}
@Override
- public void stop() {
+ public void stop(CommonCallback<Boolean> callback) {
/**
* dialog 鐨勫悇涓姸鎬�
* EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭�
@@ -92,6 +90,9 @@
// 鎴愬姛
logger.info("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]鎴愬姛锛� {}", device.getDeviceId());
}
+ if (callback != null) {
+ callback.run(event.getResponse().getRawContent() != null);
+ }
},eventResult -> {
// 澶辫触
logger.warn("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
index 22c34a6..034e24f 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -82,8 +82,9 @@
device.setIp(remoteAddressInfo.getIp());
// 璁惧鍦板潃鍙樺寲浼氬紩璧风洰褰曡闃呬换鍔″け鏁堬紝闇�瑕侀噸鏂版坊鍔�
if (device.getSubscribeCycleForCatalog() > 0) {
- deviceService.removeCatalogSubscribe(device);
- deviceService.addCatalogSubscribe(device);
+ deviceService.removeCatalogSubscribe(device, result->{
+ deviceService.addCatalogSubscribe(device);
+ });
}
}
if (device.getKeepaliveTime() == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
index e20c3bf..afa0044 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service;
+import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -39,7 +40,7 @@
* @param device 璁惧淇℃伅
* @return 甯冨皵
*/
- boolean removeCatalogSubscribe(Device device);
+ boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 娣诲姞绉诲姩浣嶇疆璁㈤槄
@@ -53,7 +54,7 @@
* @param device 璁惧淇℃伅
* @return 甯冨皵
*/
- boolean removeMobilePositionSubscribe(Device device);
+ boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
/**
* 绉婚櫎绉诲姩浣嶇疆璁㈤槄
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 1df9ee4..8665e05 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
@@ -231,8 +232,8 @@
}
}
// 绉婚櫎璁㈤槄
- removeCatalogSubscribe(device);
- removeMobilePositionSubscribe(device);
+ removeCatalogSubscribe(device, null);
+ removeMobilePositionSubscribe(device, null);
}
@Override
@@ -251,7 +252,7 @@
}
@Override
- public boolean removeCatalogSubscribe(Device device) {
+ public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
@@ -261,7 +262,7 @@
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
- subscribeTask.stop();
+ subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
@@ -284,7 +285,7 @@
}
@Override
- public boolean removeMobilePositionSubscribe(Device device) {
+ public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
@@ -294,7 +295,7 @@
Runnable runnable = dynamicTask.get(taskKey);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
- subscribeTask.stop();
+ subscribeTask.stop(callback);
}
}
dynamicTask.stop(taskKey);
@@ -522,39 +523,54 @@
if (!ObjectUtils.isEmpty(device.getStreamMode())) {
deviceInStore.setStreamMode(device.getStreamMode());
}
-
-
// 鐩綍璁㈤槄鐩稿叧鐨勪俊鎭�
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
if (device.getSubscribeCycleForCatalog() > 0) {
// 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
- removeCatalogSubscribe(deviceInStore);
+ removeCatalogSubscribe(deviceInStore, result->{
+ // 寮�鍚闃�
+ deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+ addCatalogSubscribe(deviceInStore);
+ // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
+ deviceMapper.updateCustom(deviceInStore);
+ redisCatchStorage.updateDevice(deviceInStore);
+ });
+ }else {
+ // 寮�鍚闃�
+ deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+ addCatalogSubscribe(deviceInStore);
}
- // 寮�鍚闃�
- deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
- addCatalogSubscribe(deviceInStore);
+
}else if (device.getSubscribeCycleForCatalog() == 0) {
// 鍙栨秷璁㈤槄
- deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
- removeCatalogSubscribe(deviceInStore);
+ deviceInStore.setSubscribeCycleForCatalog(0);
+ removeCatalogSubscribe(deviceInStore, null);
}
}
-
// 绉诲姩浣嶇疆璁㈤槄鐩稿叧鐨勪俊鎭�
- if (device.getSubscribeCycleForMobilePosition() > 0) {
- if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
- deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
- deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
- // 寮�鍚闃�
- addMobilePositionSubscribe(deviceInStore);
- }
- }else if (device.getSubscribeCycleForMobilePosition() == 0) {
- if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
- deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
- deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+ if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
+ if (device.getSubscribeCycleForMobilePosition() > 0) {
+ // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷
+ if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
+ removeMobilePositionSubscribe(deviceInStore, result->{
+ // 寮�鍚闃�
+ deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+ addMobilePositionSubscribe(deviceInStore);
+ // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
+ deviceMapper.updateCustom(deviceInStore);
+ redisCatchStorage.updateDevice(deviceInStore);
+ });
+ }else {
+ // 寮�鍚闃�
+ deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+ addMobilePositionSubscribe(deviceInStore);
+ }
+
+ }else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 鍙栨秷璁㈤槄
- removeMobilePositionSubscribe(deviceInStore);
+ deviceInStore.setSubscribeCycleForCatalog(0);
+ removeCatalogSubscribe(deviceInStore, null);
}
}
if (deviceInStore.getGeoCoordSys() != null) {
@@ -574,9 +590,8 @@
//浣滀负娑堟伅閫氶亾
deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
- // 鏇存柊redis
deviceMapper.updateCustom(deviceInStore);
- redisCatchStorage.removeDevice(deviceInStore.getDeviceId());
+ redisCatchStorage.updateDevice(deviceInStore);
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
index 91c992f..05501cc 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
@@ -153,10 +153,7 @@
Device device = storager.queryVideoDevice(deviceId);
device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires));
device.setMobilePositionSubmissionInterval(Integer.parseInt(interval));
- deviceService.updateDevice(device);
- if (!deviceService.removeMobilePositionSubscribe(device)) {
- throw new ControllerException(ErrorCode.ERROR100);
- }
+ deviceService.updateCustomDevice(device);
}
/**
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
index f0af27f..852f1c4 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -199,7 +199,7 @@
Runnable runnable = dynamicTask.get(key);
if (runnable instanceof ISubscribeTask) {
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
- subscribeTask.stop();
+ subscribeTask.stop(null);
}
dynamicTask.stop(key);
}
--
Gitblit v1.8.0