From 66eda32ab97d6e94e9f274d6faa4df586c452dfb Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 25 六月 2023 10:18:29 +0800
Subject: [PATCH] 优化端口预占用

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java |  175 ++++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 139 insertions(+), 36 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index a1f3d55..9ebe789 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,12 +1,14 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
@@ -14,6 +16,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.IPlayService;
@@ -21,8 +24,8 @@
 import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
+import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.utils.DateUtil;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.slf4j.Logger;
@@ -54,7 +57,19 @@
     private ParentPlatformMapper platformMapper;
 
     @Autowired
+    private PlatformCatalogMapper catalogMapper;
+
+    @Autowired
+    private PlatformChannelMapper platformChannelMapper;
+
+    @Autowired
+    private PlatformGbStreamMapper platformGbStreamMapper;
+
+    @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -87,6 +102,8 @@
     @Autowired
     private IPlayService playService;
 
+    @Autowired
+    private IInviteStreamService inviteStreamService;
 
 
     @Override
@@ -135,36 +152,107 @@
     }
 
     @Override
-    public void online(ParentPlatform parentPlatform) {
-        logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎/鏇存柊娉ㄥ唽", parentPlatform.getServerGBId());
+    public boolean update(ParentPlatform parentPlatform) {
+        logger.info("[鍥芥爣绾ц仈]鏇存柊骞冲彴 {}", parentPlatform.getDeviceGBId());
+        parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
+        ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
+        ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
+        parentPlatform.setUpdateTime(DateUtil.getNow());
+        if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
+            // 鐩綍缁撴瀯鍙戠敓鍙樺寲锛屾竻绌轰箣鍓嶇殑鍏宠仈鍏崇郴
+            logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜扮洰褰曠粨鏋勫彉鍖栵紝娓呯┖鍏宠仈鍏崇郴", parentPlatform.getDeviceGBId());
+            catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId());
+            platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId());
+            platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
+        }
+
+
+        // 鍋滄蹇冭烦瀹氭椂
+        final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
+        dynamicTask.stop(keepaliveTaskKey);
+        // 鍋滄娉ㄥ唽瀹氭椂
+        final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatformOld.getServerGBId();
+        dynamicTask.stop(registerTaskKey);
+        // 娉ㄩ攢鏃х殑
+        try {
+            if (parentPlatformOld.isStatus()) {
+                logger.info("淇濆瓨骞冲彴{}鏃跺彂鐜版晳骞冲彴鍦ㄧ嚎锛屽彂閫佹敞閿�鍛戒护", parentPlatformOld.getServerGBId());
+                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
+                    logger.info("[鍥芥爣绾ц仈] 娉ㄩ攢鎴愬姛锛� 骞冲彴锛歿}", parentPlatformOld.getServerGBId());
+                });
+            }
+
+        } catch (InvalidArgumentException | ParseException | SipException e) {
+            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 娉ㄩ攢: {}", e.getMessage());
+        }
+
+        // 鏇存柊鏁版嵁搴�
+        if (parentPlatform.getCatalogGroup() == 0) {
+            parentPlatform.setCatalogGroup(1);
+        }
+        if (parentPlatform.getAdministrativeDivision() == null) {
+            parentPlatform.setAdministrativeDivision(parentPlatform.getAdministrativeDivision());
+        }
+
+        platformMapper.updateParentPlatform(parentPlatform);
+        // 鏇存柊redis
+        redisCatchStorage.delPlatformCatchInfo(parentPlatformOld.getServerGBId());
+        ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
+        parentPlatformCatch.setParentPlatform(parentPlatform);
+        parentPlatformCatch.setId(parentPlatform.getServerGBId());
+        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
+        // 娉ㄥ唽
+        if (parentPlatform.isEnable()) {
+            // 淇濆瓨鏃跺惎鐢ㄥ氨鍙戦�佹敞鍐�
+            // 娉ㄥ唽鎴愬姛鏃剁敱绋嬪簭鐩存帴璋冪敤浜唎nline鏂规硶
+            try {
+                logger.info("[鍥芥爣绾ц仈] 骞冲彴娉ㄥ唽 {}", parentPlatform.getDeviceGBId());
+                commanderForPlatform.register(parentPlatform, eventResult -> {
+                    logger.info("[鍥芥爣绾ц仈] {},娣诲姞鍚戜笂绾ф敞鍐屽け璐ワ紝璇风‘瀹氫笂绾у钩鍙板彲鐢ㄦ椂閲嶆柊淇濆瓨", parentPlatform.getServerGBId());
+                }, null);
+            } catch (InvalidArgumentException | ParseException | SipException e) {
+                logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈: {}", e.getMessage());
+            }
+        }
+        // 閲嶆柊寮�鍚畾鏃舵敞鍐岋紝 浣跨敤缁娑堟伅
+        // 閲嶆柊寮�濮嬪績璺充繚娲�
+
+
+        return false;
+    }
+
+
+    @Override
+    public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
+        logger.info("[鍥芥爣绾ц仈]锛歿}, 骞冲彴涓婄嚎", parentPlatform.getServerGBId());
         platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
         ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-        if (parentPlatformCatch != null) {
-            parentPlatformCatch.getParentPlatform().setStatus(true);
-            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
-        }else {
+        if (parentPlatformCatch == null) {
             parentPlatformCatch = new ParentPlatformCatch();
             parentPlatformCatch.setParentPlatform(parentPlatform);
             parentPlatformCatch.setId(parentPlatform.getServerGBId());
             parentPlatform.setStatus(true);
             parentPlatformCatch.setParentPlatform(parentPlatform);
-            redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
         }
+
+        parentPlatformCatch.getParentPlatform().setStatus(true);
+        parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
+        redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
 
         final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
         if (!dynamicTask.isAlive(registerTaskKey)) {
+            logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂娉ㄥ唽浠诲姟", parentPlatform.getServerGBId());
             // 娣诲姞娉ㄥ唽浠诲姟
             dynamicTask.startCron(registerTaskKey,
                 // 娉ㄥ唽澶辫触锛堟敞鍐屾垚鍔熸椂鐢辩▼搴忕洿鎺ヨ皟鐢ㄤ簡online鏂规硶锛�
-                ()-> {
-                    registerTask(parentPlatform);
-                },
-                (parentPlatform.getExpires() - 10) *1000);
+                ()-> registerTask(parentPlatform, sipTransactionInfo),
+                    parentPlatform.getExpires() * 1000);
         }
 
 
         final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
         if (!dynamicTask.contains(keepaliveTaskKey)) {
+            logger.info("[鍥芥爣绾ц仈]锛歿}, 娣诲姞瀹氭椂蹇冭烦浠诲姟", parentPlatform.getServerGBId());
             // 娣诲姞蹇冭烦浠诲姟
             dynamicTask.startCron(keepaliveTaskKey,
                     ()-> {
@@ -196,7 +284,7 @@
                                 // 蹇冭烦鎴愬姛
                                 // 娓呯┖涔嬪墠鐨勫績璺宠秴鏃惰鏁�
                                 ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
-                                if (platformCatch.getKeepAliveReply() > 0) {
+                                if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) {
                                     platformCatch.setKeepAliveReply(0);
                                     redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                 }
@@ -205,11 +293,11 @@
                             logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�佸績璺�: {}", e.getMessage());
                         }
                     },
-                    (parentPlatform.getKeepTimeout() - 10)*1000);
+                    (parentPlatform.getKeepTimeout())*1000);
         }
     }
 
-    private void registerTask(ParentPlatform parentPlatform){
+    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
         try {
             // 璁剧疆瓒呮椂閲嶅彂锛� 鍚庣画浠庡簳灞傛敮鎸佹秷鎭噸鍙�
             String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
@@ -217,10 +305,10 @@
                 return;
             }
             dynamicTask.startDelay(key, ()->{
-                registerTask(parentPlatform);
+                registerTask(parentPlatform, sipTransactionInfo);
             }, 1000);
-            logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛岄噸鏂版敞鍐�", parentPlatform.getServerGBId());
-            commanderForPlatform.register(parentPlatform, eventResult -> {
+            logger.info("[鍥芥爣绾ц仈] 骞冲彴锛歿}娉ㄥ唽鍗冲皢鍒版湡锛屽紑濮嬬画璁�", parentPlatform.getServerGBId());
+            commanderForPlatform.register(parentPlatform, sipTransactionInfo,  eventResult -> {
                 dynamicTask.stop(key);
                 offline(parentPlatform, false);
             },eventResult -> {
@@ -270,12 +358,13 @@
         List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
         if (sendRtpItems != null && sendRtpItems.size() > 0) {
             for (SendRtpItem sendRtpItem : sendRtpItems) {
+                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                 redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                 Map<String, Object> param = new HashMap<>(3);
                 param.put("vhost", "__defaultVhost__");
                 param.put("app", sendRtpItem.getApp());
-                param.put("stream", sendRtpItem.getStreamId());
+                param.put("stream", sendRtpItem.getStream());
                 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
             }
         }
@@ -341,20 +430,22 @@
             logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽鏈壘鍒板彲鐢ㄧ殑zlm. platform: {}", platform.getServerGBId());
             return;
         }
-        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId);
-        if (streamInfo != null) {
+        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
+
+
+        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
             // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲
-            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId());
+            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
             if (mediaServerItemForStreamInfo != null) {
-                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream());
+                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                 if (!ready) {
                     // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁
-                    redisCatchStorage.stopPlay(streamInfo);
+                    inviteStreamService.removeInviteInfo(inviteInfo);
                 }else {
                     // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
                     JSONObject json = new JSONObject();
-                    json.put("app", streamInfo.getApp());
-                    json.put("stream", streamInfo.getStream());
+                    json.put("app", inviteInfo.getStreamInfo().getApp());
+                    json.put("stream", inviteInfo.getStreamInfo().getStream());
                     hookEvent.response(mediaServerItemForStreamInfo, json);
                     return;
                 }
@@ -367,19 +458,32 @@
         }
         // 榛樿涓嶈繘琛孲SRC鏍¢獙锛� TODO 鍚庣画鍙敼涓洪厤缃�
         boolean ssrcCheck = false;
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true);
+        int tcpMode;
+        if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) {
+            tcpMode = 1;
+        }else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
+            tcpMode = 2;
+        } else {
+            tcpMode = 0;
+        }
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, tcpMode);
         if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
             logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 寮�鍚鍙g洃鍚け璐ワ紝 platform: {}, channel锛� {}", platform.getServerGBId(), channelId);
-            errorEvent.response(new SipSubscribe.EventResult(-1, "绔彛鐩戝惉澶辫触"));
+            SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
+            eventResult.statusCode = -1;
+            eventResult.msg = "绔彛鐩戝惉澶辫触";
+            eventResult.type = SipSubscribe.EventResultType.failedToGetPort;
+            errorEvent.response(eventResult);
             return;
         }
-        logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}",
+        logger.info("[鍥芥爣绾ц仈] 璇煶鍠婅瘽锛屽彂璧稩nvite娑堟伅 deviceId: {}, channelId: {},鏀舵祦绔彛锛� {}, 鏀舵祦妯″紡锛歿}, SSRC: {}, SSRC鏍¢獙锛歿}",
                 platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
 
         String timeOutTaskKey = UUID.randomUUID().toString();
         dynamicTask.startDelay(timeOutTaskKey, () -> {
             // 鎵ц瓒呮椂浠诲姟鏃舵煡璇㈡槸鍚﹀凡缁忔垚鍔燂紝鎴愬姛浜嗗垯涓嶆墽琛岃秴鏃朵换鍔★紝闃叉瓒呮椂浠诲姟鍙栨秷澶辫触鐨勬儏鍐�
-            if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) {
+            InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
+            if (inviteInfoForBroadcast == null) {
                 logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀舵祦瓒呮椂 deviceId: {}, channelId: {}锛岀鍙o細{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
                 // 鐐规挱瓒呮椂鍥炲BYE 鍚屾椂閲婃斁ssrc浠ュ強姝ゆ鐐规挱鐨勮祫婧�
                 try {
@@ -396,6 +500,7 @@
             }
         }, userSetting.getPlayTimeout());
         commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
+            logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
             dynamicTask.stop(timeOutTaskKey);
             // hook鍝嶅簲
             playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);
@@ -421,7 +526,7 @@
                 if (!mediaServerItem.isRtpEnable()) {
                     logger.info("[鐐规挱娑堟伅] SSRC淇 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
 
-                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
+                    if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
                         // ssrc 涓嶅彲鐢�
                         // 閲婃斁ssrc
                         mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
@@ -449,9 +554,7 @@
                     // 鍏抽棴rtp server
                     mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                     // 閲嶆柊寮�鍚痵src server
-                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true);
-
-
+                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
                 }
             }
         }, eventResult -> {

--
Gitblit v1.8.0