From 6b8ecd1f9d2abe1e6ac0af858487755a58a2643a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 17 十一月 2022 18:09:28 +0800
Subject: [PATCH] 优化级联注册稳定性

---
 src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java                    |   10 +---
 src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java                                       |    7 ---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java                               |   37 +++++++++---------
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java                                      |    1 
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java |    2 
 src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java                                         |    3 -
 src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java                                               |    6 ++
 7 files changed, 29 insertions(+), 37 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
index 9ca936c..041d738 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -127,11 +127,15 @@
     public void execute(){
         if (futureMap.size() > 0) {
             for (String key : futureMap.keySet()) {
-                if (futureMap.get(key).isDone()) {
+                if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
                     futureMap.remove(key);
                     runnableMap.remove(key);
                 }
             }
         }
     }
+
+    public boolean isAlive(String key) {
+        return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
index 0dfa968..15e38ae 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -2,7 +2,6 @@
 
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -47,7 +46,7 @@
             parentPlatformCatch.setId(parentPlatform.getServerGBId());
             redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
             // 璁剧疆鎵�鏈夊钩鍙扮绾�
-            platformService.offline(parentPlatform);
+            platformService.offline(parentPlatform, true);
             // 鍙栨秷璁㈤槄
             sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
                 platformService.login(parentPlatform);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
index aeca07a..5c2abb2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -5,11 +5,9 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
-import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
-import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @description: 褰曞儚鏌ヨ缁撴潫浜嬩欢
@@ -22,13 +20,12 @@
 
     private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
 
-    private static Map<String, SseEmitter> sseEmitters = new Hashtable<>();
-
     public interface RecordEndEventHandler{
         void  handler(RecordInfo recordInfo);
     }
 
-    private Map<String, RecordEndEventHandler> handlerMap = new HashMap<>();
+    private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
+
     @Override
     public void onApplicationEvent(RecordEndEvent event) {
         logger.info("褰曞儚鏌ヨ瀹屾垚浜嬩欢瑙﹀彂锛宒eviceId锛歿}, channelId: {}, 褰曞儚鏁伴噺{}鏉�", event.getRecordInfo().getDeviceId(),
@@ -38,7 +35,6 @@
                 recordEndEventHandler.handler(event.getRecordInfo());
             }
         }
-
     }
 
     public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
index d0e1583..14d1f84 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -100,7 +100,7 @@
 			if (platformRegisterInfo.isRegister()) {
 				platformService.online(parentPlatform);
 			}else {
-				platformService.offline(parentPlatform);
+				platformService.offline(parentPlatform, false);
 			}
 
 			// 娉ㄥ唽/娉ㄩ攢鎴愬姛绉婚櫎缂撳瓨鐨勪俊鎭�
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
index 50f0113..7d3510f 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -169,7 +169,6 @@
                     .build();
             Response response = client.newCall(request).execute();
             if (response.isSuccessful()) {
-                logger.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType());
                 if (targetPath != null) {
                     File snapFolder = new File(targetPath);
                     if (!snapFolder.exists()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
index ddc91eb..17f8b37 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
@@ -1,12 +1,7 @@
 package com.genersoft.iot.vmp.service;
 
-import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.github.pagehelper.PageInfo;
-
-import java.util.List;
 
 /**
  * 鍥芥爣骞冲彴鐨勪笟鍔$被
@@ -40,7 +35,7 @@
      * 骞冲彴绂荤嚎
      * @param parentPlatform 骞冲彴淇℃伅
      */
-    void offline(ParentPlatform parentPlatform);
+    void offline(ParentPlatform parentPlatform, boolean stopRegisterTask);
 
     /**
      * 鍚戜笂绾у钩鍙板彂璧锋敞鍐�
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index fe67ede..fbc507a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -22,7 +22,6 @@
 
 import javax.sip.InvalidArgumentException;
 import javax.sip.SipException;
-import javax.sip.TimeoutEvent;
 import java.text.ParseException;
 import java.util.HashMap;
 import java.util.List;
@@ -131,20 +130,23 @@
         }
 
         final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
-        if (dynamicTask.contains(registerTaskKey)) {
-            dynamicTask.stop(registerTaskKey);
-        }
-        // 娣诲姞娉ㄥ唽浠诲姟
-        dynamicTask.startDelay(registerTaskKey,
+        if (!dynamicTask.isAlive(registerTaskKey)) {
+            // 娣诲姞娉ㄥ唽浠诲姟
+            dynamicTask.startCron(registerTaskKey,
                 // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
                 ()-> {
                     try {
-                        commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
+                        logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId());
+                        commanderForPlatform.register(parentPlatform, eventResult -> {
+                            offline(parentPlatform, false);
+                        },null);
                     } catch (InvalidArgumentException | ParseException | SipException e) {
                         logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
                     }
                 },
                 (parentPlatform.getExpires() - 10) *1000);
+        }
+
 
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (!dynamicTask.contains(keepaliveTaskKey)) {
@@ -160,16 +162,11 @@
                                     // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
                                     if (platformCatch.getKeepAliveReply()  == 2) {
                                         // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
-                                        offline(parentPlatform);
                                         logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
                                         try {
                                             commanderForPlatform.register(parentPlatform, eventResult1 -> {
                                                 logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
-                                                // 娣诲姞娉ㄥ唽浠诲姟
-                                                dynamicTask.startCron(registerTaskKey,
-                                                        // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
-                                                        ()->logger.info("[鍥芥爣绾ц仈] {},骞冲彴绂荤嚎鍚庢寔缁彂璧锋敞鍐岋紝澶辫触", parentPlatform.getServerGBId()),
-                                                        60*1000);
+                                                offline(parentPlatform, false);
                                             }, null);
                                         } catch (InvalidArgumentException | ParseException | SipException e) {
                                             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
@@ -198,7 +195,7 @@
     }
 
     @Override
-    public void offline(ParentPlatform parentPlatform) {
+    public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
         logger.info("[骞冲彴绂荤嚎]锛歿}", parentPlatform.getServerGBId());
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
         parentPlatformCatch.setKeepAliveReply(0);
@@ -212,11 +209,13 @@
         // 鍋滄鎵�鏈夋帹娴�
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
         stopAllPush(parentPlatform.getServerGBId());
-        // 娓呴櫎娉ㄥ唽瀹氭椂
-        logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
-        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
-        if (dynamicTask.contains(registerTaskKey)) {
-            dynamicTask.stop(registerTaskKey);
+        if (stopRegister) {
+            // 娓呴櫎娉ㄥ唽瀹氭椂
+            logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+            final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
+            if (dynamicTask.contains(registerTaskKey)) {
+                dynamicTask.stop(registerTaskKey);
+            }
         }
         // 娓呴櫎蹇冭烦瀹氭椂
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());

--
Gitblit v1.8.0