From 6b8ecd1f9d2abe1e6ac0af858487755a58a2643a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 17 十一月 2022 18:09:28 +0800 Subject: [PATCH] 优化级联注册稳定性 --- src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java | 10 +--- src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java | 7 --- src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java | 37 +++++++++--------- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java | 2 src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java | 3 - src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java | 6 ++ 7 files changed, 29 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index 9ca936c..041d738 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -127,11 +127,15 @@ public void execute(){ if (futureMap.size() > 0) { for (String key : futureMap.keySet()) { - if (futureMap.get(key).isDone()) { + if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) { futureMap.remove(key); runnableMap.remove(key); } } } } + + public boolean isAlive(String key) { + return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index 0dfa968..15e38ae 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -2,7 +2,6 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -47,7 +46,7 @@ parentPlatformCatch.setId(parentPlatform.getServerGBId()); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); // 璁剧疆鎵�鏈夊钩鍙扮绾� - platformService.offline(parentPlatform); + platformService.offline(parentPlatform, true); // 鍙栨秷璁㈤槄 sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ platformService.login(parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index aeca07a..5c2abb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -5,11 +5,9 @@ import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import java.util.HashMap; -import java.util.Hashtable; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @description: 褰曞儚鏌ヨ缁撴潫浜嬩欢 @@ -22,13 +20,12 @@ private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class); - private static Map<String, SseEmitter> sseEmitters = new Hashtable<>(); - public interface RecordEndEventHandler{ void handler(RecordInfo recordInfo); } - private Map<String, RecordEndEventHandler> handlerMap = new HashMap<>(); + private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>(); + @Override public void onApplicationEvent(RecordEndEvent event) { logger.info("褰曞儚鏌ヨ瀹屾垚浜嬩欢瑙﹀彂锛宒eviceId锛歿}, channelId: {}, 褰曞儚鏁伴噺{}鏉�", event.getRecordInfo().getDeviceId(), @@ -38,7 +35,6 @@ recordEndEventHandler.handler(event.getRecordInfo()); } } - } public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index d0e1583..14d1f84 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -100,7 +100,7 @@ if (platformRegisterInfo.isRegister()) { platformService.online(parentPlatform); }else { - platformService.offline(parentPlatform); + platformService.offline(parentPlatform, false); } // 娉ㄥ唽/娉ㄩ攢鎴愬姛绉婚櫎缂撳瓨鐨勪俊鎭� diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 50f0113..7d3510f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -169,7 +169,6 @@ .build(); Response response = client.newCall(request).execute(); if (response.isSuccessful()) { - logger.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType()); if (targetPath != null) { File snapFolder = new File(targetPath); if (!snapFolder.exists()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index ddc91eb..17f8b37 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -1,12 +1,7 @@ package com.genersoft.iot.vmp.service; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.github.pagehelper.PageInfo; - -import java.util.List; /** * 鍥芥爣骞冲彴鐨勪笟鍔$被 @@ -40,7 +35,7 @@ * 骞冲彴绂荤嚎 * @param parentPlatform 骞冲彴淇℃伅 */ - void offline(ParentPlatform parentPlatform); + void offline(ParentPlatform parentPlatform, boolean stopRegisterTask); /** * 鍚戜笂绾у钩鍙板彂璧锋敞鍐� diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index fe67ede..fbc507a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -22,7 +22,6 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; -import javax.sip.TimeoutEvent; import java.text.ParseException; import java.util.HashMap; import java.util.List; @@ -131,20 +130,23 @@ } final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); - if (dynamicTask.contains(registerTaskKey)) { - dynamicTask.stop(registerTaskKey); - } - // 娣诲姞娉ㄥ唽浠诲姟 - dynamicTask.startDelay(registerTaskKey, + if (!dynamicTask.isAlive(registerTaskKey)) { + // 娣诲姞娉ㄥ唽浠诲姟 + dynamicTask.startCron(registerTaskKey, // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛� ()-> { try { - commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null); + logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId()); + commanderForPlatform.register(parentPlatform, eventResult -> { + offline(parentPlatform, false); + },null); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage()); } }, (parentPlatform.getExpires() - 10) *1000); + } + final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); if (!dynamicTask.contains(keepaliveTaskKey)) { @@ -160,16 +162,11 @@ // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎 if (platformCatch.getKeepAliveReply() == 2) { // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽 - offline(parentPlatform); logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId()); try { commanderForPlatform.register(parentPlatform, eventResult1 -> { logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId()); - // 娣诲姞娉ㄥ唽浠诲姟 - dynamicTask.startCron(registerTaskKey, - // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛� - ()->logger.info("[鍥芥爣绾ц仈] {},骞冲彴绂荤嚎鍚庢寔缁彂璧锋敞鍐岋紝澶辫触", parentPlatform.getServerGBId()), - 60*1000); + offline(parentPlatform, false); }, null); } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage()); @@ -198,7 +195,7 @@ } @Override - public void offline(ParentPlatform parentPlatform) { + public void offline(ParentPlatform parentPlatform, boolean stopRegister) { logger.info("[骞冲彴绂荤嚎]锛歿}", parentPlatform.getServerGBId()); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); parentPlatformCatch.setKeepAliveReply(0); @@ -212,11 +209,13 @@ // 鍋滄鎵�鏈夋帹娴� logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId()); stopAllPush(parentPlatform.getServerGBId()); - // 娓呴櫎娉ㄥ唽瀹氭椂 - logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); - final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); - if (dynamicTask.contains(registerTaskKey)) { - dynamicTask.stop(registerTaskKey); + if (stopRegister) { + // 娓呴櫎娉ㄥ唽瀹氭椂 + logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId()); + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(registerTaskKey)) { + dynamicTask.stop(registerTaskKey); + } } // 娓呴櫎蹇冭烦瀹氭椂 logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId()); -- Gitblit v1.8.0