From 1fc2916c2b4b28fbf722c4401e559805f9578573 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 28 四月 2024 22:25:58 +0800
Subject: [PATCH] Merge pull request #1432 from AlphaWu/Zafu-Dev-20240428
---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java |  581 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 498 insertions(+), 83 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
old mode 100644
new mode 100755
index a173557..6554817
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,39 +1,61 @@
 package com.genersoft.iot.vmp.service.impl;
 
+import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionStatus;
+import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.event.hook.HookData;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
+import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
+import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import gov.nist.javax.sip.message.SIPResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
+import javax.sdp.*;
 import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
 import javax.sip.SipException;
 import java.text.ParseException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.UUID;
+import java.util.Vector;
 
 /**
  * @author lin
  */
 @Service
+@DS("master")
 public class PlatformServiceImpl implements IPlatformService {
 
     private final static String REGISTER_KEY_PREFIX = "platform_register_";
+
+    private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
     private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
 
     private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
@@ -42,28 +64,19 @@
     private ParentPlatformMapper platformMapper;
 
     @Autowired
-    private PlatformCatalogMapper catalogMapper;
-
-    @Autowired
-    private PlatformChannelMapper platformChannelMapper;
-
-    @Autowired
-    private PlatformGbStreamMapper platformGbStreamMapper;
-
-    @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
 
     @Autowired
     private IMediaServerService mediaServerService;
 
     @Autowired
-    private SIPCommanderFroPlatform commanderForPlatform;
+    private ISIPCommanderForPlatform commanderForPlatform;
 
     @Autowired
     private DynamicTask dynamicTask;
-
-    @Autowired
-    private ZLMRTPServerFactory zlmrtpServerFactory;
 
     @Autowired
     private SubscribeHolder subscribeHolder;
@@ -74,6 +87,65 @@
     @Autowired
     private UserSetting userSetting;
 
+    @Autowired
+    private VideoStreamSessionManager streamSession;
+
+    @Autowired
+    private IPlayService playService;
+
+    @Autowired
+    private IInviteStreamService inviteStreamService;
+
+
+    /**
+     * 娴佺寮�鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        if (!sendRtpItems.isEmpty()) {
+            for (SendRtpItem sendRtpItem : sendRtpItems) {
+                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
+                    String platformId = sendRtpItem.getPlatformId();
+                    ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
+
+                    try {
+                        if (platform != null) {
+                            commanderForPlatform.streamByeCmd(platform, sendRtpItem);
+                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                        }
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * 鍙戞祦鍋滄
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
+        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
+            for (SendRtpItem sendRtpItem : sendRtpItems) {
+                ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                try {
+                    commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                }
+                redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+                        sendRtpItem.getCallId(), sendRtpItem.getStream());
+            }
+        }
+    }
 
 
     @Override
@@ -128,14 +200,6 @@
         ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
         ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
         parentPlatform.setUpdateTime(DateUtil.getNow());
-        if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
-            // 鐩綍缁撴瀯鍙戠敓鍙樺寲锛屾竻绌轰箣鍓嶇殑鍏宠仈鍏崇郴
-            logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜扮洰褰曠粨鏋勫彉鍖栵紝娓呯┖鍏宠仈鍏崇郴", parentPlatform.getDeviceGBId());
-            catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-            platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-            platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-        }
-
 
         // 鍋滄蹇冭烦瀹氭椂
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
@@ -145,13 +209,12 @@
         dynamicTask.stop(registerTaskKey);
         // 娉ㄩ攢鏃х殑
         try {
-            if (parentPlatformOld.isStatus()) {
-                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版晳骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatform.getDeviceGBId());
+            if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) {
+                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版棫骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId());
                 commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                     logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId());
                 });
             }
-
         } catch (InvalidArgumentException | ParseException | SipException e) {
             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage());
         }
@@ -176,6 +239,7 @@
             // 淇濆瓨鏃跺惎鐢ㄥ氨鍙戦�佹敞鍐�
             // 娉ㄥ唽鎴愬姛鏃剁敱绋嬪簭鐩存帴璋冪敤浜唎nline鏂规硶
             try {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴娉ㄥ唽 {}", parentPlatform.getDeviceGBId());
                 commanderForPlatform.register(parentPlatform, eventResult -> {
                     logger.info("[鍥芥爣绾ц仈] {},娣诲姞鍚戜笂绾ф敞鍐屽け璐ワ紝璇风‘瀹氫笂绾у钩鍙板彲鐢ㄦ椂閲嶆柊淇濆瓨", parentPlatform.getServerGBId());
                 }, null);
@@ -183,8 +247,6 @@
                 logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage());
             }
         }
-        // 閲嶆柊寮�鍚畾鏃舵敞鍐岋紝 浣跨敤缁娑堟伅
-        // 閲嶆柊寮�濮嬪績璺充繚娲�
 
 
         return false;
@@ -194,6 +256,9 @@
     @Override
     public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
         logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId());
+        final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+        dynamicTask.stop(registerFailAgainTaskKey);
+
         platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
         if (parentPlatformCatch == null) {
@@ -228,35 +293,30 @@
                         try {
                             commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                                 // 蹇冭烦澶辫触
-                                if (eventResult.type == SipSubscribe.EventResultType.timeout) {
-                                    // 蹇冭烦瓒呮椂
-                                    ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-                                    // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
-                                    if (platformCatch.getKeepAliveReply()  == 2) {
-                                        // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
-                                        logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
-                                        try {
-                                            commanderForPlatform.register(parentPlatform, eventResult1 -> {
-                                                logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
-                                                offline(parentPlatform, false);
-                                            }, null);
-                                        } catch (InvalidArgumentException | ParseException | SipException e) {
-                                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
-                                        }
-                                    }
-
-                                }else {
+                                if (eventResult.type != SipSubscribe.EventResultType.timeout) {
                                     logger.warn("[鍥芥爣绾ц仈]鍙戦�佸績璺虫敹鍒伴敊璇紝code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg);
+                                }
+                                // 蹇冭烦澶辫触
+                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
+                                // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
+                                if (platformCatch.getKeepAliveReply()  == 2) {
+                                    // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
+                                    logger.info("[鍥芥爣绾ц仈] 涓夋蹇冭烦澶辫触, 骞冲彴{}({})绂荤嚎", parentPlatform.getName(), parentPlatform.getServerGBId());
+                                    offline(parentPlatform, false);
+                                }else {
+                                    platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
+                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                 }
 
                             }, eventResult -> {
                                 // 蹇冭烦鎴愬姛
                                 // 娓呯┖涔嬪墠鐨勫績璺宠秴鏃惰鏁�
                                 ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-                                if (platformCatch.getKeepAliveReply() > 0) {
+                                if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
                                     platformCatch.setKeepAliveReply(0);
                                     redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                 }
+                                logger.info("[鍙戦�佸績璺砞 鍥芥爣绾ц仈 鍙戦�佸績璺�, code锛� {}, msg: {}", eventResult.statusCode, eventResult.msg);
                             });
                         } catch (SipException | InvalidArgumentException | ParseException e) {
                             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�佸績璺�: {}", e.getMessage());
@@ -264,26 +324,53 @@
                     },
                     (parentPlatform.getKeepTimeout())*1000);
         }
+        if (parentPlatform.isAutoPushChannel()) {
+            if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
+                logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞鑷姩閫氶亾鎺ㄩ�佹ā鎷熻闃呬俊鎭�", parentPlatform.getServerGBId());
+                addSimulatedSubscribeInfo(parentPlatform);
+            }
+        }else {
+            SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
+            if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
+                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
+            }
+        }
+    }
+
+    @Override
+    public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
+        // 鑷姩娣诲姞涓�鏉℃ā鎷熺殑璁㈤槄淇℃伅
+        SubscribeInfo subscribeInfo = new SubscribeInfo();
+        subscribeInfo.setId(parentPlatform.getServerGBId());
+        subscribeInfo.setExpires(-1);
+        subscribeInfo.setEventType("Catalog");
+        int random = (int) Math.floor(Math.random() * 10000);
+        subscribeInfo.setEventId(random + "");
+        subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
+        subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
+        subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
     }
 
     private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
         try {
-            // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙�
-            String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
-            if (dynamicTask.isAlive(key)) {
-                return;
+            // 涓嶅湪鍚屼竴涓細璇濅腑缁鍒欐瘡娆″叏鏂版敞鍐�
+            if (!userSetting.isRegisterKeepIntDialog()) {
+                sipTransactionInfo = null;
             }
-            dynamicTask.startDelay(key, ()->{
-                registerTask(parentPlatform, sipTransactionInfo);
-            }, 1000);
-            logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+
+            if (sipTransactionInfo == null) {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬮噸鏂版敞鍐�", parentPlatform.getServerGBId());
+            }else {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+            }
+
             commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
-                dynamicTask.stop(key);
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽澶辫触锛寋}:{}", parentPlatform.getServerGBId(),
+                        eventResult.statusCode, eventResult.msg);
                 offline(parentPlatform, false);
-            },eventResult -> {
-                dynamicTask.stop(key);
-            });
-        } catch (InvalidArgumentException | ParseException | SipException e) {
+            }, null);
+        } catch (Exception e) {
             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
         }
     }
@@ -303,37 +390,51 @@
         // 鍋滄鎵�鏈夋帹娴�
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
         stopAllPush(parentPlatform.getServerGBId());
-        if (stopRegister) {
-            // 娓呴櫎娉ㄥ唽瀹氭椂
-            logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
-            final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
-            if (dynamicTask.contains(registerTaskKey)) {
-                dynamicTask.stop(registerTaskKey);
-            }
+
+        // 娓呴櫎娉ㄥ唽瀹氭椂
+        logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
+        if (dynamicTask.contains(registerTaskKey)) {
+            dynamicTask.stop(registerTaskKey);
         }
         // 娓呴櫎蹇冭烦瀹氭椂
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (dynamicTask.contains(keepaliveTaskKey)) {
-            // 娣诲姞蹇冭烦浠诲姟
+            // 娓呴櫎蹇冭烦浠诲姟
             dynamicTask.stop(keepaliveTaskKey);
         }
-        // 鍋滄鐩綍璁㈤槄鍥炲
-        logger.info("[骞冲彴绂荤嚎] {}, 鍋滄璁㈤槄鍥炲", parentPlatform.getServerGBId());
-        subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
+        // 鍋滄璁㈤槄鍥炲
+        SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
+        if (catalogSubscribe != null) {
+            if (catalogSubscribe.getExpires() > 0) {
+                logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鐩綍璁㈤槄鍥炲", parentPlatform.getServerGBId());
+                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
+            }
+        }
+        logger.info("[骞冲彴绂荤嚎] {}, 鍋滄绉诲姩浣嶇疆璁㈤槄鍥炲", parentPlatform.getServerGBId());
+        subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId());
+        // 鍙戣捣瀹氭椂鑷姩閲嶆柊娉ㄥ唽
+        if (!stopRegister) {
+            // 璁剧疆涓�60绉掕嚜鍔ㄥ皾璇曢噸鏂版敞鍐�
+            final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+            ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
+            if (platform.isEnable()) {
+                dynamicTask.startCron(registerFailAgainTaskKey,
+                        ()-> registerTask(platform, null),
+                        userSetting.getRegisterAgainAfterTime() * 1000);
+            }
+        }
     }
 
     private void stopAllPush(String platformId) {
         List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
         if (sendRtpItems != null && sendRtpItems.size() > 0) {
             for (SendRtpItem sendRtpItem : sendRtpItems) {
+                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                 redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
-                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Map<String, Object> param = new HashMap<>(3);
-                param.put("vhost", "__defaultVhost__");
-                param.put("app", sendRtpItem.getApp());
-                param.put("stream", sendRtpItem.getStreamId());
-                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
             }
         }
     }
@@ -389,4 +490,318 @@
             }
         }
     }
+
+    @Override
+    public void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
+                                SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
+
+        if (mediaServerItem == null) {
+            logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽鏈壘鍒板彲鐢ㄧ殑zlm. platform: {}", platform.getServerGBId());
+            return;
+        }
+        InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
+
+        if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) {
+            // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲
+            MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId());
+            if (mediaServerItemForStreamInfo != null) {
+                Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream());
+                if (!ready) {
+                    // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁
+                    inviteStreamService.removeInviteInfo(inviteInfoForOld);
+                }else {
+                    // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
+                    HookData hookData = new HookData();
+                    hookData.setApp(inviteInfoForOld.getStreamInfo().getApp());
+                    hookData.setStream(inviteInfoForOld.getStreamInfo().getStream());
+                    hookData.setMediaServer(mediaServerItemForStreamInfo);
+                    hookEvent.response(hookData);
+                    return;
+                }
+            }
+        }
+
+        String streamId = null;
+        if (mediaServerItem.isRtpEnable()) {
+            streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
+        }
+        // 榛樿涓嶈繘琛孲SRC鏍¢獙锛� TODO 鍚庣画鍙敼涓洪厤缃�
+        boolean ssrcCheck = false;
+        int tcpMode;
+        if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) {
+            tcpMode = 1;
+        }else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
+            tcpMode = 2;
+        } else {
+            tcpMode = 0;
+        }
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
+        if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
+            logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 寮�鍚鍙g洃鍚け璐ワ紝 platform: {}, channel锛� {}", platform.getServerGBId(), channelId);
+            SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
+            eventResult.statusCode = -1;
+            eventResult.msg = "绔彛鐩戝惉澶辫触";
+            eventResult.type = SipSubscribe.EventResultType.failedToGetPort;
+            errorEvent.response(eventResult);
+            return;
+        }
+        logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽锛屽彂璧稩nvite娑堟伅 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}",
+                platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
+
+        // 鍒濆鍖杛edis涓殑invite娑堟伅鐘舵��
+        InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo,
+                mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
+                InviteSessionStatus.ready);
+        inviteStreamService.updateInviteInfo(inviteInfo);
+        String timeOutTaskKey = UUID.randomUUID().toString();
+        dynamicTask.startDelay(timeOutTaskKey, () -> {
+            // 鎵ц瓒呮椂浠诲姟鏃舵煡璇㈡槸鍚﹀凡缁忔垚鍔燂紝鎴愬姛浜嗗垯涓嶆墽琛岃秴鏃朵换鍔★紝闃叉瓒呮椂浠诲姟鍙栨秷澶辫触鐨勬儏鍐�
+            InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
+            if (inviteInfoForBroadcast == null) {
+                logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
+                // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧�
+                try {
+                    commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
+                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
+                    logger.error("[鐐规挱瓒呮椂]锛� 鍙戦�丅YE澶辫触 {}", e.getMessage());
+                } finally {
+                    timeoutCallback.run(1, "鏀舵祦瓒呮椂");
+                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+                    streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
+                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+                }
+            }
+        }, userSetting.getPlayTimeout());
+        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
+            logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
+            dynamicTask.stop(timeOutTaskKey);
+            // hook鍝嶅簲
+            playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
+            // 鏀跺埌娴�
+            if (hookEvent != null) {
+                hookEvent.response(hookData);
+            }
+        }, event -> {
+
+            inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey,
+                    null, inviteInfo, InviteSessionType.BROADCAST);
+//            // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc
+//            ResponseEvent responseEvent = (ResponseEvent) event.event;
+//            String contentString = new String(responseEvent.getResponse().getRawContent());
+//            // 鑾峰彇ssrc
+//            int ssrcIndex = contentString.indexOf("y=");
+//            // 妫�鏌ユ槸鍚︽湁y瀛楁
+//            if (ssrcIndex >= 0) {
+//                //ssrc瑙勫畾闀垮害涓�10瀛楄妭锛屼笉鍙栦綑涓嬮暱搴︿互閬垮厤鍚庣画杩樻湁鈥渇=鈥濆瓧娈� TODO 鍚庣画瀵逛笉瑙勮寖鐨勯潪10浣峴src鍏煎
+//                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+//                // 鏌ヨ鍒皊src涓嶄竴鑷翠笖寮�鍚簡ssrc鏍¢獙鍒欓渶瑕侀拡瀵瑰鐞�
+//                if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
+//                    tcpActiveHandler(platform, )
+//                    return;
+//                }
+//                logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse);
+//                if (!mediaServerItem.isRtpEnable()) {
+//                    logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
+//                    // 閲婃斁ssrc
+//                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+//                    // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
+//                    if (!mediaServerItem.isRtpEnable()) {
+//                        // 娣诲姞璁㈤槄
+//                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
+//                        subscribe.removeSubscribe(hookSubscribe);
+//                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+//                        subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
+//                            logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam);
+//                            dynamicTask.stop(timeOutTaskKey);
+//                            // hook鍝嶅簲
+//                            playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
+//                            hookEvent.response(mediaServerItemInUse, hookParam);
+//                        });
+//                    }
+//                    // 鍏抽棴rtp server
+//                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+//                    // 閲嶆柊寮�鍚痵src server
+//                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
+//                }
+//            }
+        }, eventResult -> {
+            // 鏀跺埌閿欒鍥炲
+            if (errorEvent != null) {
+                errorEvent.response(eventResult);
+            }
+        });
+    }
+
+    private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
+                                 ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
+                                 InviteInfo inviteInfo, InviteSessionType inviteSessionType){
+        inviteInfo.setStatus(InviteSessionStatus.ok);
+        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
+        String contentString = new String(responseEvent.getResponse().getRawContent());
+        System.out.println(contentString);
+        String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
+        // 鍏煎鍥炲鐨勬秷鎭腑缂哄皯ssrc(y瀛楁)鐨勬儏鍐�
+        if (ssrcInResponse == null) {
+            ssrcInResponse = ssrcInfo.getSsrc();
+        }
+        if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
+            // ssrc 涓�鑷�
+            if (mediaServerItem.isRtpEnable()) {
+                // 澶氱鍙�
+                if (tcpMode == 2) {
+                    tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
+                            timeOutTaskKey, ssrcInfo, callback);
+                }
+            }else {
+                // 鍗曠鍙�
+                if (tcpMode == 2) {
+                    logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦");
+                }
+            }
+        }else {
+            logger.info("[Invite 200OK] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse);
+            // ssrc 涓嶄竴鑷�
+            if (mediaServerItem.isRtpEnable()) {
+                // 澶氱鍙�
+                if (ssrcCheck) {
+                    // ssrc妫�楠�
+                    // 鏇存柊ssrc
+                    logger.info("[Invite 200OK] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
+                    // 閲婃斁ssrc
+                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+                    Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
+                    if (!result) {
+                        try {
+                            logger.warn("[Invite 200OK] 鏇存柊ssrc澶辫触锛屽仠姝㈠枈璇� {}/{}", platform.getServerGBId(), channelId);
+                            commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
+                        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+                            logger.error("[鍛戒护鍙戦�佸け璐 鍋滄鎾斁锛� 鍙戦�丅YE: {}", e.getMessage());
+                        }
+
+                        dynamicTask.stop(timeOutTaskKey);
+                        // 閲婃斁ssrc
+                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+
+                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
+
+                        callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
+                                "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null);
+                        inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null,
+                                InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
+                                "涓嬬骇鑷畾涔変簡ssrc,閲嶆柊璁剧疆鏀舵祦淇℃伅澶辫触", null);
+
+                    }else {
+                        ssrcInfo.setSsrc(ssrcInResponse);
+                        inviteInfo.setSsrcInfo(ssrcInfo);
+                        inviteInfo.setStream(ssrcInfo.getStream());
+                        if (tcpMode == 2) {
+                            if (mediaServerItem.isRtpEnable()) {
+                                tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
+                                        timeOutTaskKey, ssrcInfo, callback);
+                            }else {
+                                logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦");
+                            }
+                        }
+                        inviteStreamService.updateInviteInfo(inviteInfo);
+                    }
+                }else {
+                    ssrcInfo.setSsrc(ssrcInResponse);
+                    inviteInfo.setSsrcInfo(ssrcInfo);
+                    inviteInfo.setStream(ssrcInfo.getStream());
+                    if (tcpMode == 2) {
+                        if (mediaServerItem.isRtpEnable()) {
+                            tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
+                                    timeOutTaskKey, ssrcInfo, callback);
+                        }else {
+                            logger.warn("[Invite 200OK] 鍗曠鍙f敹娴佹ā寮忎笉鏀寔tcp涓诲姩妯″紡鏀舵祦");
+                        }
+                    }
+                    inviteStreamService.updateInviteInfo(inviteInfo);
+                }
+            }else {
+                if (ssrcInResponse != null) {
+                    // 鍗曠鍙�
+                    // 閲嶆柊璁㈤槄娴佷笂绾�
+                    SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
+                            inviteInfo.getChannelId(), null, inviteInfo.getStream());
+                    streamSession.remove(inviteInfo.getDeviceId(),
+                            inviteInfo.getChannelId(), inviteInfo.getStream());
+                    inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
+                    streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(),
+                            inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
+                }
+            }
+        }
+    }
+
+
+    private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString,
+                                  MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
+                                  String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
+        if (tcpMode != 2) {
+            return;
+        }
+
+        String substring;
+        if (contentString.indexOf("y=") > 0) {
+            substring = contentString.substring(0, contentString.indexOf("y="));
+        }else {
+            substring = contentString;
+        }
+        try {
+            SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
+            int port = -1;
+            Vector mediaDescriptions = sdp.getMediaDescriptions(true);
+            for (Object description : mediaDescriptions) {
+                MediaDescription mediaDescription = (MediaDescription) description;
+                Media media = mediaDescription.getMedia();
+
+                Vector mediaFormats = media.getMediaFormats(false);
+                if (mediaFormats.contains("8") || mediaFormats.contains("0")) {
+                    port = media.getMediaPort();
+                    break;
+                }
+            }
+            logger.info("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 杩炴帴瀵规柟鐨勫湴鍧�锛歿}:{}, SSRC: {}, SSRC鏍¢獙锛歿}",
+                    platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
+            Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
+            logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}", result);
+        } catch (SdpException e) {
+            logger.error("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 瑙f瀽200OK鐨凷DP淇℃伅澶辫触", platform.getServerGBId(), channelId, e);
+            dynamicTask.stop(timeOutTaskKey);
+            mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+            // 閲婃斁ssrc
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+
+            streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
+
+            callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
+                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
+            inviteStreamService.call(InviteSessionType.PLAY, platform.getServerGBId(), channelId, null,
+                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
+                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
+        }
+    }
+
+    @Override
+    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) {
+
+        try {
+            if (sendBye) {
+                commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null);
+            }
+        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+            logger.warn("[娑堟伅鍙戦�佸け璐 鍋滄璇煶瀵硅锛� 骞冲彴锛歿}锛岄�氶亾锛歿}", platform.getId(), channel.getChannelId() );
+        } finally {
+            mediaServerService.closeRTPServer(mediaServerItem, stream);
+            InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream);
+            if (inviteInfo != null) {
+                // 閲婃斁ssrc
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
+                inviteStreamService.removeInviteInfo(inviteInfo);
+            }
+            streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream);
+        }
+    }
 }
--
Gitblit v1.8.0