From a9ab5c28e9fd52c1d936a245ac46c9e556f6bc3e Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 20 二月 2024 11:12:42 +0800 Subject: [PATCH] 优化订阅机制,需要重新订阅时,取消命令发送后再发送订阅命令 #1273 --- src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java | 5 + src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java | 15 +++-- src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 73 ++++++++++++++--------- src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java | 15 ++-- src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java | 4 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java | 2 src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java | 15 ---- src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java | 4 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java | 5 + src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java | 5 - 10 files changed, 75 insertions(+), 68 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index d932a20..e7b7ab8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -52,7 +52,7 @@ Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡 dynamicTask.stop(taskOverdueKey); @@ -87,7 +87,7 @@ Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 娣诲姞浠诲姟澶勭悊璁㈤槄杩囨湡 dynamicTask.stop(taskOverdueKey); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java index a4e711d..8d1c7d2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java @@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.gb28181.task; -import javax.sip.DialogState; +import com.genersoft.iot.vmp.common.CommonCallback; /** * @author lin */ public interface ISubscribeTask extends Runnable{ - void stop(); + void stop(CommonCallback<Boolean> callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 2ffbfe4..d9270bb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -7,14 +8,13 @@ import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import javax.sip.*; +import javax.sip.DialogState; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 鐩綍璁㈤槄浠诲姟 @@ -71,7 +71,7 @@ } @Override - public void stop() { + public void stop(CommonCallback<Boolean> callback) { /** * dialog 鐨勫悇涓姸鎬� * EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭� @@ -94,6 +94,9 @@ // 鎴愬姛 logger.info("[鍙栨秷鐩綍璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 澶辫触 logger.warn("[鍙栨秷鐩綍璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 2e792c1..a4512f3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -1,20 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task.impl; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SpringBeanFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; - -import javax.sip.DialogState; -import java.util.List; /** * 鍚戝凡缁忚闃�(绉诲姩浣嶇疆)鐨勪笂绾у彂閫丮obilePosition娑堟伅 @@ -38,7 +27,7 @@ } @Override - public void stop() { + public void stop(CommonCallback<Boolean> callback) { } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index 0abd3ca..9fed079 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -1,21 +1,19 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; -import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 绉诲姩浣嶇疆璁㈤槄鐨勫畾鏃舵洿鏂� @@ -70,7 +68,7 @@ } @Override - public void stop() { + public void stop(CommonCallback<Boolean> callback) { /** * dialog 鐨勫悇涓姸鎬� * EARLY-> Early state鐘舵��-鍒濆璇锋眰鍙戦�佷互鍚庯紝鏀跺埌浜嗕竴涓复鏃跺搷搴旀秷鎭� @@ -92,6 +90,9 @@ // 鎴愬姛 logger.info("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]鎴愬姛锛� {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 澶辫触 logger.warn("[鍙栨秷绉诲姩浣嶇疆璁㈤槄]澶辫触锛屼俊浠ゅ彂閫佸け璐ワ細 {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 22c34a6..034e24f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -82,8 +82,9 @@ device.setIp(remoteAddressInfo.getIp()); // 璁惧鍦板潃鍙樺寲浼氬紩璧风洰褰曡闃呬换鍔″け鏁堬紝闇�瑕侀噸鏂版坊鍔� if (device.getSubscribeCycleForCatalog() > 0) { - deviceService.removeCatalogSubscribe(device); - deviceService.addCatalogSubscribe(device); + deviceService.removeCatalogSubscribe(device, result->{ + deviceService.addCatalogSubscribe(device); + }); } } if (device.getKeepaliveTime() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index e20c3bf..afa0044 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; @@ -39,7 +40,7 @@ * @param device 璁惧淇℃伅 * @return 甯冨皵 */ - boolean removeCatalogSubscribe(Device device); + boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback); /** * 娣诲姞绉诲姩浣嶇疆璁㈤槄 @@ -53,7 +54,7 @@ * @param device 璁惧淇℃伅 * @return 甯冨皵 */ - boolean removeMobilePositionSubscribe(Device device); + boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback); /** * 绉婚櫎绉诲姩浣嶇疆璁㈤槄 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 1df9ee4..8665e05 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -231,8 +232,8 @@ } } // 绉婚櫎璁㈤槄 - removeCatalogSubscribe(device); - removeMobilePositionSubscribe(device); + removeCatalogSubscribe(device, null); + removeMobilePositionSubscribe(device, null); } @Override @@ -251,7 +252,7 @@ } @Override - public boolean removeCatalogSubscribe(Device device) { + public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -261,7 +262,7 @@ Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -284,7 +285,7 @@ } @Override - public boolean removeMobilePositionSubscribe(Device device) { + public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -294,7 +295,7 @@ Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -522,39 +523,54 @@ if (!ObjectUtils.isEmpty(device.getStreamMode())) { deviceInStore.setStreamMode(device.getStreamMode()); } - - // 鐩綍璁㈤槄鐩稿叧鐨勪俊鎭� if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { if (device.getSubscribeCycleForCatalog() > 0) { // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷 if (deviceInStore.getSubscribeCycleForCatalog() != 0) { - removeCatalogSubscribe(deviceInStore); + removeCatalogSubscribe(deviceInStore, result->{ + // 寮�鍚闃� + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); + // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹� + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 寮�鍚闃� + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); } - // 寮�鍚闃� - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - addCatalogSubscribe(deviceInStore); + }else if (device.getSubscribeCycleForCatalog() == 0) { // 鍙栨秷璁㈤槄 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - removeCatalogSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } - // 绉诲姩浣嶇疆璁㈤槄鐩稿叧鐨勪俊鎭� - if (device.getSubscribeCycleForMobilePosition() > 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); - // 寮�鍚闃� - addMobilePositionSubscribe(deviceInStore); - } - }else if (device.getSubscribeCycleForMobilePosition() == 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { + if (device.getSubscribeCycleForMobilePosition() > 0) { + // 鑻ュ凡寮�鍚闃咃紝浣嗚闃呭懆鏈熶笉鍚岋紝鍒欏厛鍙栨秷 + if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { + removeMobilePositionSubscribe(deviceInStore, result->{ + // 寮�鍚闃� + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + // 鍥犱负鏄紓姝ユ墽琛岋紝闇�瑕佸湪杩欓噷鏇存柊涓嬫暟鎹� + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 寮�鍚闃� + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + } + + }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 鍙栨秷璁㈤槄 - removeMobilePositionSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } if (deviceInStore.getGeoCoordSys() != null) { @@ -574,9 +590,8 @@ //浣滀负娑堟伅閫氶亾 deviceInStore.setAsMessageChannel(device.isAsMessageChannel()); - // 鏇存柊redis deviceMapper.updateCustom(deviceInStore); - redisCatchStorage.removeDevice(deviceInStore.getDeviceId()); + redisCatchStorage.updateDevice(deviceInStore); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java index 91c992f..05501cc 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java @@ -153,10 +153,7 @@ Device device = storager.queryVideoDevice(deviceId); device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires)); device.setMobilePositionSubmissionInterval(Integer.parseInt(interval)); - deviceService.updateDevice(device); - if (!deviceService.removeMobilePositionSubscribe(device)) { - throw new ControllerException(ErrorCode.ERROR100); - } + deviceService.updateCustomDevice(device); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index f0af27f..852f1c4 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -199,7 +199,7 @@ Runnable runnable = dynamicTask.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } dynamicTask.stop(key); } -- Gitblit v1.8.0