From 16b7e4a7ef473a6af29ec78aeb2f471fa398efdd Mon Sep 17 00:00:00 2001
From: leesam <leesam@leesam.cn>
Date: 星期三, 10 四月 2024 20:49:44 +0800
Subject: [PATCH] Merge branch 'refs/heads/master' into develop-add-api-key

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java |  129 +++++++++++++++++++++++++++---------------
 1 files changed, 82 insertions(+), 47 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index 06c621e..aa39f41 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,10 +1,7 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.InviteInfo;
-import com.genersoft.iot.vmp.common.InviteSessionStatus;
-import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.*;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -12,46 +9,38 @@
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.event.hook.HookData;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlatformService;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
-import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import javax.sdp.*;
 import javax.sip.InvalidArgumentException;
 import javax.sip.ResponseEvent;
-import javax.sip.PeerUnavailableException;
 import javax.sip.SipException;
 import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 import java.util.*;
 
 /**
@@ -81,7 +70,7 @@
     private IMediaServerService mediaServerService;
 
     @Autowired
-    private SIPCommanderFroPlatform commanderForPlatform;
+    private ISIPCommanderForPlatform commanderForPlatform;
 
     @Autowired
     private DynamicTask dynamicTask;
@@ -99,11 +88,10 @@
     private UserSetting userSetting;
 
     @Autowired
-    private ZlmHttpHookSubscribe subscribe;
+    private HookSubscribe subscribe;
 
     @Autowired
     private VideoStreamSessionManager streamSession;
-
 
     @Autowired
     private IPlayService playService;
@@ -111,8 +99,56 @@
     @Autowired
     private IInviteStreamService inviteStreamService;
 
-    @Autowired
-    private ZLMRESTfulUtils zlmresTfulUtils;
+
+    /**
+     * 娴佺寮�鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        if (!sendRtpItems.isEmpty()) {
+            for (SendRtpItem sendRtpItem : sendRtpItems) {
+                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
+                    String platformId = sendRtpItem.getPlatformId();
+                    ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
+
+                    try {
+                        if (platform != null) {
+                            commanderForPlatform.streamByeCmd(platform, sendRtpItem);
+                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                        }
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * 鍙戞祦鍋滄
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
+        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
+            for (SendRtpItem sendRtpItem : sendRtpItems) {
+                ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                try {
+                    commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                }
+                redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
+                        sendRtpItem.getCallId(), sendRtpItem.getStream());
+            }
+        }
+    }
 
 
     @Override
@@ -400,7 +436,7 @@
             for (SendRtpItem sendRtpItem : sendRtpItems) {
                 ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                 redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
-                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                 Map<String, Object> param = new HashMap<>(3);
                 param.put("vhost", "__defaultVhost__");
                 param.put("app", sendRtpItem.getApp());
@@ -463,7 +499,7 @@
     }
 
     @Override
-    public void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent,
+    public void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
                                 SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
 
         if (mediaServerItem == null) {
@@ -474,19 +510,19 @@
 
         if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) {
             // 濡傛灉zlm涓嶅瓨鍦ㄨ繖涓祦锛屽垯鍒犻櫎鏁版嵁鍗冲彲
-            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId());
+            MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId());
             if (mediaServerItemForStreamInfo != null) {
-                Boolean ready = zlmServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream());
+                Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream());
                 if (!ready) {
                     // 閿欒瀛樺湪浜巖edis涓殑鏁版嵁
                     inviteStreamService.removeInviteInfo(inviteInfoForOld);
                 }else {
                     // 娴佺‘瀹炲皻鍦ㄦ帹娴侊紝鐩存帴鍥炶皟缁撴灉
-                    OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam();
-                    hookParam.setApp(inviteInfoForOld.getStreamInfo().getApp());
-                    hookParam.setStream(inviteInfoForOld.getStreamInfo().getStream());
-
-                    hookEvent.response(mediaServerItemForStreamInfo, hookParam);
+                    HookData hookData = new HookData();
+                    hookData.setApp(inviteInfoForOld.getStreamInfo().getApp());
+                    hookData.setStream(inviteInfoForOld.getStreamInfo().getStream());
+                    hookData.setMediaServer(mediaServerItemForStreamInfo);
+                    hookEvent.response(hookData);
                     return;
                 }
             }
@@ -506,7 +542,7 @@
         } else {
             tcpMode = 0;
         }
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, tcpMode);
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
         if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
             logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 寮�鍚鍙g洃鍚け璐ワ紝 platform: {}, channel锛� {}", platform.getServerGBId(), channelId);
             SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
@@ -544,14 +580,14 @@
                 }
             }
         }, userSetting.getPlayTimeout());
-        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{
+        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
             logger.info("[鍥芥爣绾ц仈] 鍙戣捣璇煶鍠婅瘽 鏀跺埌涓婄骇鎺ㄦ祦 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
             dynamicTask.stop(timeOutTaskKey);
             // hook鍝嶅簲
-            playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId);
+            playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
             // 鏀跺埌娴�
             if (hookEvent != null) {
-                hookEvent.response(mediaServerItem, hookParam);
+                hookEvent.response(hookData);
             }
         }, event -> {
 
@@ -604,13 +640,12 @@
         });
     }
 
-    private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServerItem mediaServerItem,
+    private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
                                  ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
                                  InviteInfo inviteInfo, InviteSessionType inviteSessionType){
         inviteInfo.setStatus(InviteSessionStatus.ok);
         ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
         String contentString = new String(responseEvent.getResponse().getRawContent());
-        System.out.println(1111);
         System.out.println(contentString);
         String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
         // 鍏煎鍥炲鐨勬秷鎭腑缂哄皯ssrc(y瀛楁)鐨勬儏鍐�
@@ -709,7 +744,7 @@
 
 
     private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString,
-                                  MediaServerItem mediaServerItem, int tcpMode, boolean ssrcCheck,
+                                  MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
                                   String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
         if (tcpMode != 2) {
             return;
@@ -737,8 +772,8 @@
             }
             logger.info("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 杩炴帴瀵规柟鐨勫湴鍧�锛歿}:{}, SSRC: {}, SSRC鏍¢獙锛歿}",
                     platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
-            JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
-            logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}", jsonObject);
+            Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
+            logger.info("[TCP涓诲姩杩炴帴瀵规柟] 缁撴灉锛� {}", result);
         } catch (SdpException e) {
             logger.error("[TCP涓诲姩杩炴帴瀵规柟] serverGbId: {}, channelId: {}, 瑙f瀽200OK鐨凷DP淇℃伅澶辫触", platform.getServerGBId(), channelId, e);
             dynamicTask.stop(timeOutTaskKey);
@@ -757,7 +792,7 @@
     }
 
     @Override
-    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServerItem mediaServerItem) {
+    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) {
 
         try {
             if (sendBye) {

--
Gitblit v1.8.0