From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 22:10:35 +0800
Subject: [PATCH] 重构多wvp国标级联机制

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java |   85 ++++++++++++++++++++++++------------------
 1 files changed, 48 insertions(+), 37 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 5feb306..32bf76a 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,10 +18,7 @@
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -103,9 +100,6 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    @Autowired
     private ZlmHttpHookSubscribe subscribe;
 
     @Autowired
@@ -129,6 +123,9 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -236,10 +233,7 @@
                 // 閴存潈閫氳繃
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
-
 
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
@@ -260,7 +254,6 @@
         if ("rtp".equals(param.getApp())) {
 
             InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
             // 鍗曠鍙fā寮忎笅淇敼娴� ID
             if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
                 String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -268,6 +261,8 @@
                 if (inviteInfo != null) {
                     result.setStream_replace(inviteInfo.getStream());
                     logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+                    // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+                    param.setStream(inviteInfo.getStream());
                 }
             }
 
@@ -464,8 +459,7 @@
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -482,10 +476,13 @@
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {
@@ -509,32 +506,46 @@
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
 
-                                try {
-                                    if (platform != null) {
-                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
-                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                            if (audioBroadcastCatch != null) {
-                                                // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
-                                                logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
                                             }
                                         }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                     }
-                                } catch (SipException | InvalidArgumentException | ParseException |
-                                         SsrcTransactionNotFoundException e) {
-                                    logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                 }
+
                             }
                         }
                     }

--
Gitblit v1.8.0