From d9cfe061b9b501511f5d769f751c8ff6bbcb1bf9 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 17 八月 2023 15:20:25 +0800
Subject: [PATCH] 优化对讲释放流程

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java |  160 ++++++++++++++++++++++++++---------------------------
 1 files changed, 78 insertions(+), 82 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index 9ebe789..dbde373 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.common.InviteInfo;
 import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -11,11 +10,12 @@
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlatformService;
@@ -49,21 +49,14 @@
 public class PlatformServiceImpl implements IPlatformService {
 
     private final static String REGISTER_KEY_PREFIX = "platform_register_";
+
+    private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
     private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
 
     private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
 
     @Autowired
     private ParentPlatformMapper platformMapper;
-
-    @Autowired
-    private PlatformCatalogMapper catalogMapper;
-
-    @Autowired
-    private PlatformChannelMapper platformChannelMapper;
-
-    @Autowired
-    private PlatformGbStreamMapper platformGbStreamMapper;
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
@@ -81,7 +74,7 @@
     private DynamicTask dynamicTask;
 
     @Autowired
-    private ZLMRTPServerFactory zlmrtpServerFactory;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private SubscribeHolder subscribeHolder;
@@ -158,14 +151,6 @@
         ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
         ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
         parentPlatform.setUpdateTime(DateUtil.getNow());
-        if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
-            // 鐩綍缁撴瀯鍙戠敓鍙樺寲锛屾竻绌轰箣鍓嶇殑鍏宠仈鍏崇郴
-            logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜扮洰褰曠粨鏋勫彉鍖栵紝娓呯┖鍏宠仈鍏崇郴", parentPlatform.getDeviceGBId());
-            catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-            platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-            platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
-        }
-
 
         // 鍋滄蹇冭烦瀹氭椂
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
@@ -176,12 +161,11 @@
         // 娉ㄩ攢鏃х殑
         try {
             if (parentPlatformOld.isStatus()) {
-                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版晳骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId());
+                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版棫骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId());
                 commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                     logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId());
                 });
             }
-
         } catch (InvalidArgumentException | ParseException | SipException e) {
             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage());
         }
@@ -214,9 +198,6 @@
                 logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage());
             }
         }
-        // 閲嶆柊寮�鍚畾鏃舵敞鍐岋紝 浣跨敤缁娑堟伅
-        // 閲嶆柊寮�濮嬪績璺充繚娲�
-
 
         return false;
     }
@@ -225,6 +206,9 @@
     @Override
     public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
         logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId());
+        final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+        dynamicTask.stop(registerFailAgainTaskKey);
+
         platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
         if (parentPlatformCatch == null) {
@@ -265,15 +249,9 @@
                                     // 姝ゆ椂鏄涓夋蹇冭烦瓒呮椂锛� 骞冲彴绂荤嚎
                                     if (platformCatch.getKeepAliveReply()  == 2) {
                                         // 璁剧疆骞冲彴绂荤嚎锛屽苟閲嶆柊娉ㄥ唽
-                                        logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽", parentPlatform.getServerGBId());
-                                        try {
-                                            commanderForPlatform.register(parentPlatform, eventResult1 -> {
-                                                logger.info("[鍥芥爣绾ц仈] {}锛屼笁娆″績璺宠秴鏃跺悗鍐嶆鍙戣捣娉ㄥ唽浠嶇劧澶辫触锛屽紑濮嬪畾鏃跺彂璧锋敞鍐岋紝闂撮殧涓�1鍒嗛挓", parentPlatform.getServerGBId());
-                                                offline(parentPlatform, false);
-                                            }, null);
-                                        } catch (InvalidArgumentException | ParseException | SipException e) {
-                                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄥ唽: {}", e.getMessage());
-                                        }
+                                        logger.info("[鍥芥爣绾ц仈] 涓夋蹇冭烦瓒呮椂, 骞冲彴{}({})绂荤嚎", parentPlatform.getName(), parentPlatform.getServerGBId());
+                                        offline(parentPlatform, false);
+
                                     }
 
                                 }else {
@@ -299,21 +277,22 @@
 
     private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
         try {
-            // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙�
-            String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
-            if (dynamicTask.isAlive(key)) {
-                return;
+            // 涓嶅湪鍚屼竴涓細璇濅腑缁鍒欐瘡娆″叏鏂版敞鍐�
+            if (!userSetting.isRegisterKeepIntDialog()) {
+                sipTransactionInfo = null;
             }
-            dynamicTask.startDelay(key, ()->{
-                registerTask(parentPlatform, sipTransactionInfo);
-            }, 1000);
-            logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+
+            if (sipTransactionInfo == null) {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬮噸鏂版敞鍐�", parentPlatform.getServerGBId());
+            }else {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+            }
+
             commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
-                dynamicTask.stop(key);
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽澶辫触锛寋}:{}", parentPlatform.getServerGBId(),
+                        eventResult.statusCode, eventResult.msg);
                 offline(parentPlatform, false);
-            },eventResult -> {
-                dynamicTask.stop(key);
-            });
+            }, null);
         } catch (InvalidArgumentException | ParseException | SipException e) {
             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈瀹氭椂娉ㄥ唽: {}", e.getMessage());
         }
@@ -334,24 +313,35 @@
         // 鍋滄鎵�鏈夋帹娴�
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄鎵�鏈夋帹娴�", parentPlatform.getServerGBId());
         stopAllPush(parentPlatform.getServerGBId());
-        if (stopRegister) {
-            // 娓呴櫎娉ㄥ唽瀹氭椂
-            logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
-            final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
-            if (dynamicTask.contains(registerTaskKey)) {
-                dynamicTask.stop(registerTaskKey);
-            }
+
+        // 娓呴櫎娉ㄥ唽瀹氭椂
+        logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
+        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
+        if (dynamicTask.contains(registerTaskKey)) {
+            dynamicTask.stop(registerTaskKey);
         }
         // 娓呴櫎蹇冭烦瀹氭椂
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄瀹氭椂鍙戦�佸績璺充换鍔�", parentPlatform.getServerGBId());
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (dynamicTask.contains(keepaliveTaskKey)) {
-            // 娣诲姞蹇冭烦浠诲姟
+            // 娓呴櫎蹇冭烦浠诲姟
             dynamicTask.stop(keepaliveTaskKey);
         }
         // 鍋滄鐩綍璁㈤槄鍥炲
         logger.info("[骞冲彴绂荤嚎] {}, 鍋滄璁㈤槄鍥炲", parentPlatform.getServerGBId());
         subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
+        // 鍙戣捣瀹氭椂鑷姩閲嶆柊娉ㄥ唽
+        if (!stopRegister) {
+            // 璁剧疆涓�60绉掕嚜鍔ㄥ皾璇曢噸鏂版敞鍐�
+            final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
+            ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
+            if (platform.isEnable()) {
+                dynamicTask.startCron(registerFailAgainTaskKey,
+                        ()-> registerTask(platform, null),
+                        userSetting.getRegisterAgainAfterTime() * 1000);
+            }
+
+        }
     }
 
     private void stopAllPush(String platformId) {
@@ -365,7 +355,7 @@
                 param.put("vhost", "__defaultVhost__");
                 param.put("app", sendRtpItem.getApp());
                 param.put("stream", sendRtpItem.getStream());
-                zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
+                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
             }
         }
     }
@@ -432,21 +422,21 @@
         }
         InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
 
-
         if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
             // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲
             MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
             if (mediaServerItemForStreamInfo != null) {
-                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
+                Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                 if (!ready) {
                     // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁
                     inviteStreamService.removeInviteInfo(inviteInfo);
                 }else {
                     // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
-                    JSONObject json = new JSONObject();
-                    json.put("app", inviteInfo.getStreamInfo().getApp());
-                    json.put("stream", inviteInfo.getStreamInfo().getStream());
-                    hookEvent.response(mediaServerItemForStreamInfo, json);
+                    OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
+                    hookParam.setApp(inviteInfo.getStreamInfo().getApp());
+                    hookParam.setStream(inviteInfo.getStreamInfo().getStream());
+
+                    hookEvent.response(mediaServerItemForStreamInfo, hookParam);
                     return;
                 }
             }
@@ -499,14 +489,14 @@
                 }
             }
         }, userSetting.getPlayTimeout());
-        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
+        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
             logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
             dynamicTask.stop(timeOutTaskKey);
             // hook鍝嶅簲
-            playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
+            playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
             // 鏀跺埌娴�
             if (hookEvent != null) {
-                hookEvent.response(mediaServerItem, response);
+                hookEvent.response(mediaServerItem, hookParam);
             }
         }, event -> {
             // 鏀跺埌200OK 妫�娴媠src鏄惁鏈夊彉鍖栵紝闃叉涓婄骇鑷畾涔変簡ssrc
@@ -525,30 +515,20 @@
                 logger.info("[鐐规挱娑堟伅] 鏀跺埌invite 200, 鍙戠幇涓嬬骇鑷畾涔変簡ssrc: {}", ssrcInResponse);
                 if (!mediaServerItem.isRtpEnable()) {
                     logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
-
-                    if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
-                        // ssrc 涓嶅彲鐢�
-                        // 閲婃斁ssrc
-                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
-                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
-                        event.msg = "涓嬬骇鑷畾涔変簡ssrc,浣嗘槸姝src涓嶅彲鐢�";
-                        event.statusCode = 400;
-                        errorEvent.response(event);
-                        return;
-                    }
-
+                    // 閲婃斁ssrc
+                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                     // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
                     if (!mediaServerItem.isRtpEnable()) {
                         // 娣诲姞璁㈤槄
                         HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                         subscribe.removeSubscribe(hookSubscribe);
                         hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
-                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
-                            logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
+                        subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
+                            logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + hookParam);
                             dynamicTask.stop(timeOutTaskKey);
                             // hook鍝嶅簲
-                            playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId);
-                            hookEvent.response(mediaServerItemInUse, response);
+                            playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
+                            hookEvent.response(mediaServerItemInUse, hookParam);
                         });
                     }
                     // 鍏抽棴rtp server
@@ -566,7 +546,23 @@
     }
 
     @Override
-    public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException {
-        commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null);
+    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServerItem mediaServerItem) {
+
+        try {
+            if (sendBye) {
+                commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null);
+            }
+        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+            logger.warn("[娑堟伅鍙戦�佸け璐 鍋滄璇煶瀵硅锛� 骞冲彴锛歿}锛岄�氶亾锛歿}", platform.getId(), channel.getChannelId() );
+        } finally {
+            mediaServerService.closeRTPServer(mediaServerItem, stream);
+            InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream);
+            if (inviteInfo != null) {
+                // 閲婃斁ssrc
+                mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
+                inviteStreamService.removeInviteInfo(inviteInfo);
+            }
+            streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream);
+        }
     }
 }

--
Gitblit v1.8.0