From fc77b3f819b3143387b90a4d631725e7c6513ecd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 01 十二月 2023 15:49:18 +0800
Subject: [PATCH] 支持重新接入zlm的时候检查拉流代理数据是否异常,异常数据自动移除

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java |  243 +++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 187 insertions(+), 56 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
old mode 100644
new mode 100755
index fe67ede..524c85b
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -4,29 +4,35 @@
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
+import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.utils.DateUtil;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import gov.nist.javax.sip.message.SIPRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.sip.InvalidArgumentException;
+import javax.sip.PeerUnavailableException;
 import javax.sip.SipException;
-import javax.sip.TimeoutEvent;
+import javax.sip.SipFactory;
+import javax.sip.address.Address;
+import javax.sip.address.SipURI;
+import javax.sip.header.*;
+import javax.sip.message.Request;
 import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @author lin
@@ -35,6 +41,8 @@
 public class PlatformServiceImpl implements IPlatformService {
 
     private final static String REGISTER_KEY_PREFIX = "platform_register_";
+
+    private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
     private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
 
     private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
@@ -43,7 +51,19 @@
     private ParentPlatformMapper platformMapper;
 
     @Autowired
+    private PlatformCatalogMapper catalogMapper;
+
+    @Autowired
+    private PlatformChannelMapper platformChannelMapper;
+
+    @Autowired
+    private PlatformGbStreamMapper platformGbStreamMapper;
+
+    @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -55,7 +75,7 @@
     private DynamicTask dynamicTask;
 
     @Autowired
-    private ZLMRTPServerFactory zlmrtpServerFactory;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private SubscribeHolder subscribeHolder;
@@ -114,91 +134,188 @@
     }
 
     @Override
-    public void online(ParentPlatform parentPlatform) {
-        logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎/鏇存柊娉ㄥ唽", parentPlatform.getServerGBId());
+    public boolean update(ParentPlatform parentPlatform) {
+        logger.info("[鍥芥爣绾ц仈]鏇存柊骞冲彴 {}", parentPlatform.getDeviceGBId());
+        parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
+        ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
+        ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
+        parentPlatform.setUpdateTime(DateUtil.getNow());
+
+        // 鍋滄蹇冭烦瀹氭椂
+        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
+        dynamicTask.stop(keepaliveTaskKey);
+        // 鍋滄娉ㄥ唽瀹氭椂
+        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId();
+        dynamicTask.stop(registerTaskKey);
+        // 娉ㄩ攢鏃х殑
+        try {
+            if (parentPlatformOld.isStatus()) {
+                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版棫骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId());
+                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
+                    logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId());
+                });
+            }
+        } catch (InvalidArgumentException | ParseException | SipException e) {
+            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage());
+        }
+
+        // 鏇存柊鏁版嵁搴�
+        if (parentPlatform.getCatalogGroup() == 0) {
+            parentPlatform.setCatalogGroup(1);
+        }
+        if (parentPlatform.getAdministrativeDivision() == null) {
+            parentPlatform.setAdministrativeDivision(parentPlatform.getAdministrativeDivision());
+        }
+
+        platformMapper.updateParentPlatform(parentPlatform);
+        // 鏇存柊redis
+        redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId());
+        ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
+        parentPlatformCatch.setParentPlatform(parentPlatform);
+        parentPlatformCatch.setId(parentPlatform.getServerGBId());
+        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
+        // 娉ㄥ唽
+        if (parentPlatform.isEnable()) {
+            // 淇濆瓨鏃跺惎鐢ㄥ氨鍙戦�佹敞鍐�
+            // 娉ㄥ唽鎴愬姛鏃剁敱绋嬪簭鐩存帴璋冪敤浜唎nline鏂规硶
+            try {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴娉ㄥ唽 {}", parentPlatform.getDeviceGBId());
+                commanderForPlatform.register(parentPlatform, eventResult -> {
+                    logger.info("[鍥芥爣绾ц仈] {},娣诲姞鍚戜笂绾ф敞鍐屽け璐ワ紝璇风‘瀹氫笂绾у钩鍙板彲鐢ㄦ椂閲嶆柊淇濆瓨", parentPlatform.getServerGBId());
+                }, null);
+            } catch (InvalidArgumentException | ParseException | SipException e) {
+                logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage());
+            }
+        }
+
+
+        return false;
+    }
+
+
+    @Override
+    public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
+        logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId());
+        final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+        dynamicTask.stop(registerFailAgainTaskKey);
+
         platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-        if (parentPlatformCatch != null) {
-            parentPlatformCatch.getParentPlatform().setStatus(true);
-            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
-        }else {
+        if (parentPlatformCatch == null) {
             parentPlatformCatch = new ParentPlatformCatch();
             parentPlatformCatch.setParentPlatform(parentPlatform);
             parentPlatformCatch.setId(parentPlatform.getServerGBId());
             parentPlatform.setStatus(true);
             parentPlatformCatch.setParentPlatform(parentPlatform);
-            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
         }
 
+        parentPlatformCatch.getParentPlatform().setStatus(true);
+        parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
+        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
+
         final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
-        if (dynamicTask.contains(registerTaskKey)) {
-            dynamicTask.stop(registerTaskKey);
-        }
-        // 娣诲姞娉ㄥ唽浠诲姟
-        dynamicTask.startDelay(registerTaskKey,
+        if (!dynamicTask.isAlive(registerTaskKey)) {
+            logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+            // 娣诲姞娉ㄥ唽浠诲姟
+            dynamicTask.startCron(registerTaskKey,
                 // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
-                ()-> {
-                    try {
-                        commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
-                    } catch (InvalidArgumentException | ParseException | SipException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
-                    }
-                },
-                (parentPlatform.getExpires() - 10) *1000);
+                ()-> registerTask(parentPlatform, sipTransactionInfo),
+                    parentPlatform.getExpires() * 1000);
+        }
+
 
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (!dynamicTask.contains(keepaliveTaskKey)) {
+            logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂蹇冭烦浠诲姟", parentPlatform.getServerGBId());
             // 娣诲姞蹇冭烦浠诲姟
             dynamicTask.startCron(keepaliveTaskKey,
                     ()-> {
                         try {
                             commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                                 // 蹇冭烦澶辫触
-                                if (eventResult.type == SipSubscribe.EventResultType.timeout) {
-                                    // 蹇冭烦瓒呮椂
-                                    ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-                                    // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
-                                    if (platformCatch.getKeepAliveReply()  == 2) {
-                                        // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
-                                        offline(parentPlatform);
-                                        logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
-                                        try {
-                                            commanderForPlatform.register(parentPlatform, eventResult1 -> {
-                                                logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
-                                                // 娣诲姞娉ㄥ唽浠诲姟
-                                                dynamicTask.startCron(registerTaskKey,
-                                                        // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
-                                                        ()->logger.info("[鍥芥爣绾ц仈] {},骞冲彴绂荤嚎鍚庢寔缁彂璧锋敞鍐岋紝澶辫触", parentPlatform.getServerGBId()),
-                                                        60*1000);
-                                            }, null);
-                                        } catch (InvalidArgumentException | ParseException | SipException e) {
-                                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
-                                        }
-                                    }
-
-                                }else {
+                                if (eventResult.type != SipSubscribe.EventResultType.timeout) {
                                     logger.warn("[鍥芥爣绾ц仈]鍙戦�佸績璺虫敹鍒伴敊璇紝code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg);
+                                }
+                                // 蹇冭烦澶辫触
+                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
+                                // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
+                                if (platformCatch.getKeepAliveReply()  == 2) {
+                                    // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
+                                    logger.info("[鍥芥爣绾ц仈] 涓夋蹇冭烦澶辫触, 骞冲彴{}({})绂荤嚎", parentPlatform.getName(), parentPlatform.getServerGBId());
+                                    offline(parentPlatform, false);
+                                }else {
+                                    platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
+                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                 }
 
                             }, eventResult -> {
                                 // 蹇冭烦鎴愬姛
                                 // 娓呯┖涔嬪墠鐨勫績璺宠秴鏃惰鏁�
                                 ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-                                if (platformCatch.getKeepAliveReply() > 0) {
+                                if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
                                     platformCatch.setKeepAliveReply(0);
                                     redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                 }
+                                logger.info("[鍙戦�佸績璺砞 鍥芥爣绾ц仈 鍙戦�佸績璺�, code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg);
                             });
                         } catch (SipException | InvalidArgumentException | ParseException e) {
                             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�佸績璺�: {}", e.getMessage());
                         }
                     },
-                    (parentPlatform.getKeepTimeout() - 10)*1000);
+                    (parentPlatform.getKeepTimeout())*1000);
+        }
+        if (parentPlatform.isAutoPushChannel()) {
+            if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
+                addSimulatedSubscribeInfo(parentPlatform);
+            }
+        }else {
+            SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
+            if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
+                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
+            }
         }
     }
 
     @Override
-    public void offline(ParentPlatform parentPlatform) {
+    public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
+        // 鑷姩娣诲姞涓�鏉℃ā鎷熺殑璁㈤槄淇℃伅
+        SubscribeInfo subscribeInfo = new SubscribeInfo();
+        subscribeInfo.setId(parentPlatform.getServerGBId());
+        subscribeInfo.setExpires(-1);
+        subscribeInfo.setEventType("Catalog");
+        int random = (int) Math.floor(Math.random() * 10000);
+        subscribeInfo.setEventId(random + "");
+        subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
+        subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
+    }
+
+    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
+        try {
+            // 涓嶅湪鍚屼竴涓細璇濅腑缁鍒欐瘡娆″叏鏂版敞鍐�
+            if (!userSetting.isRegisterKeepIntDialog()) {
+                sipTransactionInfo = null;
+            }
+
+            if (sipTransactionInfo == null) {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬮噸鏂版敞鍐�", parentPlatform.getServerGBId());
+            }else {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+            }
+
+            commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽澶辫触锛寋}:{}", parentPlatform.getServerGBId(),
+                        eventResult.statusCode, eventResult.msg);
+                offline(parentPlatform, false);
+            }, null);
+        } catch (Exception e) {
+            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
+        }
+    }
+
+    @Override
+    public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
         logger.info("[骞冲彴绂荤嚎]锛歿}", parentPlatform.getServerGBId());
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
         parentPlatformCatch.setKeepAliveReply(0);
@@ -212,6 +329,7 @@
         // 鍋滄鎵�鏈夋帹娴�
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
         stopAllPush(parentPlatform.getServerGBId());
+
         // 娓呴櫎娉ㄥ唽瀹氭椂
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
         final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
@@ -222,25 +340,38 @@
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (dynamicTask.contains(keepaliveTaskKey)) {
-            // 娣诲姞蹇冭烦浠诲姟
+            // 娓呴櫎蹇冭烦浠诲姟
             dynamicTask.stop(keepaliveTaskKey);
         }
         // 鍋滄鐩綍璁㈤槄鍥炲
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄璁㈤槄鍥炲", parentPlatform.getServerGBId());
         subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
+        // 鍙戣捣瀹氭椂鑷姩閲嶆柊娉ㄥ唽
+        if (!stopRegister) {
+            // 璁剧疆涓�60绉掕嚜鍔ㄥ皾璇曢噸鏂版敞鍐�
+            final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+            ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
+            if (platform.isEnable()) {
+                dynamicTask.startCron(registerFailAgainTaskKey,
+                        ()-> registerTask(platform, null),
+                        userSetting.getRegisterAgainAfterTime() * 1000);
+            }
+
+        }
     }
 
     private void stopAllPush(String platformId) {
         List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
         if (sendRtpItems != null && sendRtpItems.size() > 0) {
             for (SendRtpItem sendRtpItem : sendRtpItems) {
+                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                 redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                 Map<String, Object> param = new HashMap<>(3);
                 param.put("vhost", "__defaultVhost__");
                 param.put("app", sendRtpItem.getApp());
                 param.put("stream", sendRtpItem.getStreamId());
-                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
             }
         }
     }

--
Gitblit v1.8.0