From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 00:10:38 +0800
Subject: [PATCH] 优化多wvp国标级联推流

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java |   85 +++++++++++++++++++++++++-----------------
 1 files changed, 50 insertions(+), 35 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 5feb306..346b4a2 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,14 +18,12 @@
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -103,7 +101,7 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
+    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
 
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
@@ -129,6 +127,9 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -236,10 +237,7 @@
                 // 閴存潈閫氳繃
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
-
 
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
@@ -260,7 +258,6 @@
         if ("rtp".equals(param.getApp())) {
 
             InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
             // 鍗曠鍙fā寮忎笅淇敼娴� ID
             if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
                 String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -268,6 +265,8 @@
                 if (inviteInfo != null) {
                     result.setStream_replace(inviteInfo.getStream());
                     logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+                    // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+                    param.setStream(inviteInfo.getStream());
                 }
             }
 
@@ -464,8 +463,7 @@
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -482,10 +480,13 @@
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {
@@ -509,32 +510,46 @@
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
 
-                                try {
-                                    if (platform != null) {
-                                        commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
-                                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
-                                        if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
-                                                || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
-                                            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                            if (audioBroadcastCatch != null) {
-                                                // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
-                                                logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                                                audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
+
+                                    try {
+                                        if (platform != null) {
+                                            commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
+                                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
+                                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                                        } else {
+                                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
+                                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                if (audioBroadcastCatch != null) {
+                                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                                }
                                             }
                                         }
+                                    } catch (SipException | InvalidArgumentException | ParseException |
+                                             SsrcTransactionNotFoundException e) {
+                                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                     }
-                                } catch (SipException | InvalidArgumentException | ParseException |
-                                         SsrcTransactionNotFoundException e) {
-                                    logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                 }
+
                             }
                         }
                     }

--
Gitblit v1.8.0