From 6b8ecd1f9d2abe1e6ac0af858487755a58a2643a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 17 十一月 2022 18:09:28 +0800
Subject: [PATCH] 优化级联注册稳定性
---
src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java | 10 +---
src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java | 7 ---
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 37 +++++++++---------
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java | 2
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java | 3 -
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java | 6 ++
7 files changed, 29 insertions(+), 37 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
index 9ca936c..041d738 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -127,11 +127,15 @@
public void execute(){
if (futureMap.size() > 0) {
for (String key : futureMap.keySet()) {
- if (futureMap.get(key).isDone()) {
+ if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
futureMap.remove(key);
runnableMap.remove(key);
}
}
}
}
+
+ public boolean isAlive(String key) {
+ return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
index 0dfa968..15e38ae 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -2,7 +2,6 @@
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -47,7 +46,7 @@
parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 璁剧疆鎵�鏈夊钩鍙扮绾�
- platformService.offline(parentPlatform);
+ platformService.offline(parentPlatform, true);
// 鍙栨秷璁㈤槄
sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
platformService.login(parentPlatform);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
index aeca07a..5c2abb2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -5,11 +5,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
-import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-import java.util.HashMap;
-import java.util.Hashtable;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 褰曞儚鏌ヨ缁撴潫浜嬩欢
@@ -22,13 +20,12 @@
private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
- private static Map<String, SseEmitter> sseEmitters = new Hashtable<>();
-
public interface RecordEndEventHandler{
void handler(RecordInfo recordInfo);
}
- private Map<String, RecordEndEventHandler> handlerMap = new HashMap<>();
+ private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
+
@Override
public void onApplicationEvent(RecordEndEvent event) {
logger.info("褰曞儚鏌ヨ瀹屾垚浜嬩欢瑙﹀彂锛宒eviceId锛歿}, channelId: {}, 褰曞儚鏁伴噺{}鏉�", event.getRecordInfo().getDeviceId(),
@@ -38,7 +35,6 @@
recordEndEventHandler.handler(event.getRecordInfo());
}
}
-
}
public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
index d0e1583..14d1f84 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -100,7 +100,7 @@
if (platformRegisterInfo.isRegister()) {
platformService.online(parentPlatform);
}else {
- platformService.offline(parentPlatform);
+ platformService.offline(parentPlatform, false);
}
// 娉ㄥ唽/娉ㄩ攢鎴愬姛绉婚櫎缂撳瓨鐨勪俊鎭�
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
index 50f0113..7d3510f 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -169,7 +169,6 @@
.build();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
- logger.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType());
if (targetPath != null) {
File snapFolder = new File(targetPath);
if (!snapFolder.exists()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
index ddc91eb..17f8b37 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
@@ -1,12 +1,7 @@
package com.genersoft.iot.vmp.service;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.github.pagehelper.PageInfo;
-
-import java.util.List;
/**
* 鍥芥爣骞冲彴鐨勪笟鍔$被
@@ -40,7 +35,7 @@
* 骞冲彴绂荤嚎
* @param parentPlatform 骞冲彴淇℃伅
*/
- void offline(ParentPlatform parentPlatform);
+ void offline(ParentPlatform parentPlatform, boolean stopRegisterTask);
/**
* 鍚戜笂绾у钩鍙板彂璧锋敞鍐�
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index fe67ede..fbc507a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -22,7 +22,6 @@
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
-import javax.sip.TimeoutEvent;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
@@ -131,20 +130,23 @@
}
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
- if (dynamicTask.contains(registerTaskKey)) {
- dynamicTask.stop(registerTaskKey);
- }
- // 娣诲姞娉ㄥ唽浠诲姟
- dynamicTask.startDelay(registerTaskKey,
+ if (!dynamicTask.isAlive(registerTaskKey)) {
+ // 娣诲姞娉ㄥ唽浠诲姟
+ dynamicTask.startCron(registerTaskKey,
// 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
()-> {
try {
- commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
+ logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId());
+ commanderForPlatform.register(parentPlatform, eventResult -> {
+ offline(parentPlatform, false);
+ },null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
}
},
(parentPlatform.getExpires() - 10) *1000);
+ }
+
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) {
@@ -160,16 +162,11 @@
// 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
if (platformCatch.getKeepAliveReply() == 2) {
// 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
- offline(parentPlatform);
logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
- // 娣诲姞娉ㄥ唽浠诲姟
- dynamicTask.startCron(registerTaskKey,
- // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
- ()->logger.info("[鍥芥爣绾ц仈] {},骞冲彴绂荤嚎鍚庢寔缁彂璧锋敞鍐岋紝澶辫触", parentPlatform.getServerGBId()),
- 60*1000);
+ offline(parentPlatform, false);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
@@ -198,7 +195,7 @@
}
@Override
- public void offline(ParentPlatform parentPlatform) {
+ public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
logger.info("[骞冲彴绂荤嚎]锛歿}", parentPlatform.getServerGBId());
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatformCatch.setKeepAliveReply(0);
@@ -212,11 +209,13 @@
// 鍋滄鎵�鏈夋帹娴�
logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
stopAllPush(parentPlatform.getServerGBId());
- // 娓呴櫎娉ㄥ唽瀹氭椂
- logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
- final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
- if (dynamicTask.contains(registerTaskKey)) {
- dynamicTask.stop(registerTaskKey);
+ if (stopRegister) {
+ // 娓呴櫎娉ㄥ唽瀹氭椂
+ logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+ final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
+ if (dynamicTask.contains(registerTaskKey)) {
+ dynamicTask.stop(registerTaskKey);
+ }
}
// 娓呴櫎蹇冭烦瀹氭椂
logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());
--
Gitblit v1.8.0