From a9ab5c28e9fd52c1d936a245ac46c9e556f6bc3e Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 20 二月 2024 11:12:42 +0800
Subject: [PATCH] 优化订阅机制,需要重新订阅时,取消命令发送后再发送订阅命令 #1273

---
 src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java                                                               |    5 +
 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java                                               |   15 +++--
 src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java                                                       |   73 ++++++++++++++---------
 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java                                        |   15 ++--
 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java                                                         |    4 
 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java                                                  |    2 
 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java                                 |   15 ----
 src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java                                                          |    4 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java |    5 +
 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java                             |    5 -
 10 files changed, 75 insertions(+), 68 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
index d932a20..e7b7ab8 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -52,7 +52,7 @@
         Runnable runnable = dynamicTask.get(taskOverdueKey);
         if (runnable instanceof ISubscribeTask) {
             ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
-            subscribeTask.stop();
+            subscribeTask.stop(null);
         }
         // 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡
         dynamicTask.stop(taskOverdueKey);
@@ -87,7 +87,7 @@
         Runnable runnable = dynamicTask.get(taskOverdueKey);
         if (runnable instanceof ISubscribeTask) {
             ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
-            subscribeTask.stop();
+            subscribeTask.stop(null);
         }
         // 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡
         dynamicTask.stop(taskOverdueKey);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
index a4e711d..8d1c7d2 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
@@ -1,10 +1,10 @@
 package com.genersoft.iot.vmp.gb28181.task;
 
-import javax.sip.DialogState;
+import com.genersoft.iot.vmp.common.CommonCallback;
 
 /**
  * @author lin
  */
 public interface ISubscribeTask extends Runnable{
-    void stop();
+    void stop(CommonCallback<Boolean> callback);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
index 2ffbfe4..d9270bb 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.task.impl;
 
+import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
@@ -7,14 +8,13 @@
 import gov.nist.javax.sip.message.SIPRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
-import javax.sip.*;
+import javax.sip.DialogState;
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
 import javax.sip.header.ToHeader;
 import java.text.ParseException;
-import java.util.Timer;
-import java.util.TimerTask;
 
 /**
  * 鐩綍璁㈤槄浠诲姟
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public void stop() {
+    public void stop(CommonCallback<Boolean> callback) {
         /**
          * dialog 鐨勫悇涓姸鎬�
          * EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭�
@@ -94,6 +94,9 @@
                     // 鎴愬姛
                     logger.info("[鍙栨秷鐩綍璁㈤槄]鎴愬姛锛� {}", device.getDeviceId());
                 }
+                if (callback != null) {
+                    callback.run(event.getResponse().getRawContent() != null);
+                }
             },eventResult -> {
                 // 澶辫触
                 logger.warn("[鍙栨秷鐩綍璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
index 2e792c1..a4512f3 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -1,20 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.task.impl;
 
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.SpringBeanFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
-
-import javax.sip.DialogState;
-import java.util.List;
 
 /**
  * 鍚戝凡缁忚闃�(绉诲姩浣嶇疆)鐨勪笂绾у彂閫丮obilePosition娑堟伅
@@ -38,7 +27,7 @@
     }
 
     @Override
-    public void stop() {
+    public void stop(CommonCallback<Boolean> callback) {
 
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
index 0abd3ca..9fed079 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
@@ -1,21 +1,19 @@
 package com.genersoft.iot.vmp.gb28181.task.impl;
 
+import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import gov.nist.javax.sip.message.SIPRequest;
-import gov.nist.javax.sip.message.SIPResponse;
-import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Async;
 
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
 import javax.sip.header.ToHeader;
 import java.text.ParseException;
-import java.util.Timer;
-import java.util.TimerTask;
 
 /**
  * 绉诲姩浣嶇疆璁㈤槄鐨勫畾鏃舵洿鏂�
@@ -70,7 +68,7 @@
     }
 
     @Override
-    public void stop() {
+    public void stop(CommonCallback<Boolean> callback) {
         /**
          * dialog 鐨勫悇涓姸鎬�
          * EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭�
@@ -92,6 +90,9 @@
                     // 鎴愬姛
                     logger.info("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]鎴愬姛锛� {}", device.getDeviceId());
                 }
+                if (callback != null) {
+                    callback.run(event.getResponse().getRawContent() != null);
+                }
             },eventResult -> {
                 // 澶辫触
                 logger.warn("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
index 22c34a6..034e24f 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -82,8 +82,9 @@
             device.setIp(remoteAddressInfo.getIp());
             // 璁惧鍦板潃鍙樺寲浼氬紩璧风洰褰曡闃呬换鍔″け鏁堬紝闇�瑕侀噸鏂版坊鍔�
             if (device.getSubscribeCycleForCatalog() > 0) {
-                deviceService.removeCatalogSubscribe(device);
-                deviceService.addCatalogSubscribe(device);
+                deviceService.removeCatalogSubscribe(device, result->{
+                    deviceService.addCatalogSubscribe(device);
+                });
             }
         }
         if (device.getKeepaliveTime() == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
index e20c3bf..afa0044 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.service;
 
+import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -39,7 +40,7 @@
      * @param device 璁惧淇℃伅
      * @return 甯冨皵
      */
-    boolean removeCatalogSubscribe(Device device);
+    boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
 
     /**
      * 娣诲姞绉诲姩浣嶇疆璁㈤槄
@@ -53,7 +54,7 @@
      * @param device 璁惧淇℃伅
      * @return 甯冨皵
      */
-    boolean removeMobilePositionSubscribe(Device device);
+    boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
 
     /**
      * 绉婚櫎绉诲姩浣嶇疆璁㈤槄
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 1df9ee4..8665e05 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.CommonCallback;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
@@ -231,8 +232,8 @@
             }
         }
         // 绉婚櫎璁㈤槄
-        removeCatalogSubscribe(device);
-        removeMobilePositionSubscribe(device);
+        removeCatalogSubscribe(device, null);
+        removeMobilePositionSubscribe(device, null);
     }
 
     @Override
@@ -251,7 +252,7 @@
     }
 
     @Override
-    public boolean removeCatalogSubscribe(Device device) {
+    public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
         if (device == null || device.getSubscribeCycleForCatalog() < 0) {
             return false;
         }
@@ -261,7 +262,7 @@
             Runnable runnable = dynamicTask.get(taskKey);
             if (runnable instanceof ISubscribeTask) {
                 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
-                subscribeTask.stop();
+                subscribeTask.stop(callback);
             }
         }
         dynamicTask.stop(taskKey);
@@ -284,7 +285,7 @@
     }
 
     @Override
-    public boolean removeMobilePositionSubscribe(Device device) {
+    public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
         if (device == null || device.getSubscribeCycleForCatalog() < 0) {
             return false;
         }
@@ -294,7 +295,7 @@
             Runnable runnable = dynamicTask.get(taskKey);
             if (runnable instanceof ISubscribeTask) {
                 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
-                subscribeTask.stop();
+                subscribeTask.stop(callback);
             }
         }
         dynamicTask.stop(taskKey);
@@ -522,39 +523,54 @@
         if (!ObjectUtils.isEmpty(device.getStreamMode())) {
             deviceInStore.setStreamMode(device.getStreamMode());
         }
-
-
         //  鐩綍璁㈤槄鐩稿叧鐨勪俊鎭�
         if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
             if (device.getSubscribeCycleForCatalog() > 0) {
                 // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷
                 if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
-                    removeCatalogSubscribe(deviceInStore);
+                    removeCatalogSubscribe(deviceInStore, result->{
+                        // 寮�鍚闃�
+                        deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+                        addCatalogSubscribe(deviceInStore);
+                        // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
+                        deviceMapper.updateCustom(deviceInStore);
+                        redisCatchStorage.updateDevice(deviceInStore);
+                    });
+                }else {
+                    // 寮�鍚闃�
+                    deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
+                    addCatalogSubscribe(deviceInStore);
                 }
-                // 寮�鍚闃�
-                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
-                addCatalogSubscribe(deviceInStore);
+
             }else if (device.getSubscribeCycleForCatalog() == 0) {
                 // 鍙栨秷璁㈤槄
-                deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
-                removeCatalogSubscribe(deviceInStore);
+                deviceInStore.setSubscribeCycleForCatalog(0);
+                removeCatalogSubscribe(deviceInStore, null);
             }
         }
-
         // 绉诲姩浣嶇疆璁㈤槄鐩稿叧鐨勪俊鎭�
-        if (device.getSubscribeCycleForMobilePosition() > 0) {
-            if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
-                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
-                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
-                // 寮�鍚闃�
-                addMobilePositionSubscribe(deviceInStore);
-            }
-        }else if (device.getSubscribeCycleForMobilePosition() == 0) {
-            if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
-                deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
-                deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+        if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
+            if (device.getSubscribeCycleForMobilePosition() > 0) {
+                // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷
+                if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
+                    removeMobilePositionSubscribe(deviceInStore, result->{
+                        // 寮�鍚闃�
+                        deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                        addMobilePositionSubscribe(deviceInStore);
+                        // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹�
+                        deviceMapper.updateCustom(deviceInStore);
+                        redisCatchStorage.updateDevice(deviceInStore);
+                    });
+                }else {
+                    // 寮�鍚闃�
+                    deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
+                    addMobilePositionSubscribe(deviceInStore);
+                }
+
+            }else if (device.getSubscribeCycleForMobilePosition() == 0) {
                 // 鍙栨秷璁㈤槄
-                removeMobilePositionSubscribe(deviceInStore);
+                deviceInStore.setSubscribeCycleForCatalog(0);
+                removeCatalogSubscribe(deviceInStore, null);
             }
         }
         if (deviceInStore.getGeoCoordSys() != null) {
@@ -574,9 +590,8 @@
         //浣滀负娑堟伅閫氶亾
         deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
         
-        // 鏇存柊redis
         deviceMapper.updateCustom(deviceInStore);
-        redisCatchStorage.removeDevice(deviceInStore.getDeviceId());
+        redisCatchStorage.updateDevice(deviceInStore);
     }
 
     @Override
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
index 91c992f..05501cc 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
@@ -153,10 +153,7 @@
         Device device = storager.queryVideoDevice(deviceId);
         device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires));
         device.setMobilePositionSubmissionInterval(Integer.parseInt(interval));
-        deviceService.updateDevice(device);
-        if (!deviceService.removeMobilePositionSubscribe(device)) {
-            throw new ControllerException(ErrorCode.ERROR100);
-        }
+        deviceService.updateCustomDevice(device);
     }
 
     /**
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
index f0af27f..852f1c4 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
@@ -199,7 +199,7 @@
 					Runnable runnable = dynamicTask.get(key);
 					if (runnable instanceof ISubscribeTask) {
 						ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
-						subscribeTask.stop();
+						subscribeTask.stop(null);
 					}
 					dynamicTask.stop(key);
 				}

--
Gitblit v1.8.0