From d70bfb53dd5d92d0405f7ce3d2c9bbabce26184b Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 20 五月 2024 11:37:07 +0800
Subject: [PATCH] 修复兼容zlm-pro转码时无人管看自动移除失败的BUG

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java |  122 ++++++++++++++++++++++------------------
 1 files changed, 68 insertions(+), 54 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 9346eb8..6968236 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -10,8 +10,8 @@
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -24,8 +24,8 @@
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -73,9 +73,6 @@
     private AudioBroadcastManager audioBroadcastManager;
 
     @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
     private IPlayService playService;
 
     @Autowired
@@ -83,6 +80,10 @@
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+
+    @Autowired
+    private IRedisRpcService redisRpcService;
 
     @Autowired
     private IInviteStreamService inviteStreamService;
@@ -106,9 +107,6 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    @Autowired
     private ZlmHttpHookSubscribe subscribe;
 
     @Autowired
@@ -124,9 +122,6 @@
     private VideoStreamSessionManager sessionManager;
 
     @Autowired
-    private AssistRESTfulUtils assistRESTfulUtils;
-
-    @Autowired
     private SSRCFactory ssrcFactory;
 
     @Qualifier("taskExecutor")
@@ -135,6 +130,9 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -147,7 +145,7 @@
 
         taskExecutor.execute(() -> {
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
-            if (subscribes != null && subscribes.size() > 0) {
+            if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                     subscribe.response(null, param);
                 }
@@ -166,7 +164,7 @@
     @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
     public HookResult onPlay(@RequestBody OnPlayHookParam param) {
         if (logger.isDebugEnabled()) {
-            logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}" + param.getMediaServerId(), param);
+            logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param);
         }
         String mediaServerId = param.getMediaServerId();
 
@@ -242,21 +240,14 @@
                 // 閴存潈閫氳繃
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
-
 
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
         taskExecutor.execute(() -> {
             ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
             if (subscribe != null) {
-                if (mediaInfo != null) {
-                    subscribe.response(mediaInfo, param);
-                } else {
-                    new HookResultForOnPublish(1, "zlm not register");
-                }
+                subscribe.response(mediaInfo, param);
             }
         });
 
@@ -270,7 +261,6 @@
         if ("rtp".equals(param.getApp())) {
 
             InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
             // 鍗曠鍙fā寮忎笅淇敼娴� ID
             if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
                 String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -278,6 +268,8 @@
                 if (inviteInfo != null) {
                     result.setStream_replace(inviteInfo.getStream());
                     logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+                    // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+                    param.setStream(inviteInfo.getStream());
                 }
             }
 
@@ -297,7 +289,7 @@
                 String channelId = ssrcTransactionForAll.get(0).getChannelId();
                 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
                 if (deviceChannel != null) {
-                    result.setEnable_audio(deviceChannel.isHasAudio());
+                    result.setEnable_audio(deviceChannel.getHasAudio());
                 }
                 // 濡傛灉鏄綍鍍忎笅杞藉氨璁剧疆瑙嗛闂撮殧鍗佺
                 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
@@ -357,6 +349,11 @@
             MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
             if (mediaInfo == null) {
                 logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId());
+                return;
+            }
+            if (!ObjectUtils.isEmpty(mediaInfo.getTranscodeSuffix())
+                    && !"null".equalsIgnoreCase(mediaInfo.getTranscodeSuffix())
+                    && param.getStream().endsWith(mediaInfo.getTranscodeSuffix())  ) {
                 return;
             }
             if (subscribe != null) {
@@ -474,8 +471,7 @@
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -492,10 +488,13 @@
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {
@@ -519,17 +518,20 @@
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
                     if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
-
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                // 鍦╤ook鏀跺埌杩欎釜娑堟伅锛岃鏄庡彂娴佷竴瀹氭槸鏈骇瀹屾垚鐨勩��
+                                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                                ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+                                Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
                                 try {
                                     if (platform != null) {
                                         commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
-                                        redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
-                                                sendRtpItem.getCallId(), sendRtpItem.getStream());
-                                    } else {
+                                        redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+                                        redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
+                                    } else if (device != null) {
                                         cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
                                         if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                                 || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
@@ -540,6 +542,9 @@
                                                 audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                             }
                                         }
+                                    }else {
+                                        // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                        redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey());
                                     }
                                 } catch (SipException | InvalidArgumentException | ParseException |
                                          SsrcTransactionNotFoundException e) {
@@ -563,6 +568,19 @@
 
         logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
                 param.getApp(), param.getStream());
+
+        MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
+        if (mediaInfo == null) {
+            JSONObject ret = new JSONObject();
+            ret.put("code", 0);
+            return ret;
+        }
+        if (!ObjectUtils.isEmpty(mediaInfo.getTranscodeSuffix())
+                && !"null".equalsIgnoreCase(mediaInfo.getTranscodeSuffix())
+                && param.getStream().endsWith(mediaInfo.getTranscodeSuffix())  ) {
+            param.setStream(param.getStream().substring(0, param.getStream().lastIndexOf(mediaInfo.getTranscodeSuffix()) -1 ));
+        }
+
         JSONObject ret = new JSONObject();
         ret.put("code", 0);
         // 鍥芥爣绫诲瀷鐨勬祦
@@ -589,14 +607,10 @@
                             } catch (SipException | InvalidArgumentException | ParseException e) {
                                 logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
                             }
-                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
-                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
+                            redisCatchStorage.deleteSendRTPServer(sendRtpItem);
+                            ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                             if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
-                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
-                                        sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
-                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
-                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
-                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
+                                redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
                             }
                         }
                     }
@@ -798,7 +812,7 @@
         logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId());
         taskExecutor.execute(() -> {
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
-            if (subscribes != null && subscribes.size() > 0) {
+            if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                     subscribe.response(null, zlmServerConfig);
                 }
@@ -824,17 +838,18 @@
         }
         taskExecutor.execute(() -> {
             List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
-            if (sendRtpItems.size() > 0) {
+            if (!sendRtpItems.isEmpty()) {
                 for (SendRtpItem sendRtpItem : sendRtpItems) {
                     ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
-                    try {
-                        commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
-                    } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                    if(parentPlatform != null) {
+                        try {
+                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+                        } catch (SipException | InvalidArgumentException | ParseException e) {
+                            logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
+                        }
                     }
-                    redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
-                            sendRtpItem.getCallId(), sendRtpItem.getStream());
+                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+                    redisCatchStorage.deleteSendRTPServer(sendRtpItem);
                 }
             }
         });
@@ -847,12 +862,11 @@
      */
     @ResponseBody
     @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
-    public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam
+    public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam
             param) {
         logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
 
         taskExecutor.execute(() -> {
-            JSONObject json = (JSONObject) JSON.toJSON(param);
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
             if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {

--
Gitblit v1.8.0