From 9c6765d44ef2ccb06fdaf525a06e564a331ab892 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 22:10:35 +0800
Subject: [PATCH] 重构多wvp国标级联机制

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java |  107 +++++++++++++++++++++++++----------------------------
 1 files changed, 50 insertions(+), 57 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 3ee8ac1..32bf76a 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,10 +18,7 @@
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -73,9 +70,6 @@
     private AudioBroadcastManager audioBroadcastManager;
 
     @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
     private IPlayService playService;
 
     @Autowired
@@ -106,9 +100,6 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    @Autowired
     private ZlmHttpHookSubscribe subscribe;
 
     @Autowired
@@ -124,9 +115,6 @@
     private VideoStreamSessionManager sessionManager;
 
     @Autowired
-    private AssistRESTfulUtils assistRESTfulUtils;
-
-    @Autowired
     private SSRCFactory ssrcFactory;
 
     @Qualifier("taskExecutor")
@@ -135,6 +123,9 @@
 
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IStreamPushService streamPushService;
 
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
@@ -147,7 +138,7 @@
 
         taskExecutor.execute(() -> {
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
-            if (subscribes != null && subscribes.size() > 0) {
+            if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                     subscribe.response(null, param);
                 }
@@ -166,7 +157,7 @@
     @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
     public HookResult onPlay(@RequestBody OnPlayHookParam param) {
         if (logger.isDebugEnabled()) {
-            logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}" + param.getMediaServerId(), param);
+            logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param);
         }
         String mediaServerId = param.getMediaServerId();
 
@@ -242,21 +233,14 @@
                 // 閴存潈閫氳繃
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
-
 
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
         taskExecutor.execute(() -> {
             ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
             if (subscribe != null) {
-                if (mediaInfo != null) {
-                    subscribe.response(mediaInfo, param);
-                } else {
-                    new HookResultForOnPublish(1, "zlm not register");
-                }
+                subscribe.response(mediaInfo, param);
             }
         });
 
@@ -267,10 +251,9 @@
             result.setEnable_mp4(userSetting.isRecordPushLive());
         }
         // 鍥芥爣娴�
-        if ("rtp".equals(param.getApp()) ) {
+        if ("rtp".equals(param.getApp())) {
 
             InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
             // 鍗曠鍙fā寮忎笅淇敼娴� ID
             if (!mediaInfo.isRtpEnable() && inviteInfo == null) {
                 String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
@@ -278,6 +261,8 @@
                 if (inviteInfo != null) {
                     result.setStream_replace(inviteInfo.getStream());
                     logger.info("[ZLM HOOK]鎺ㄦ祦閴存潈 stream: {} 鏇挎崲涓� {}", param.getStream(), inviteInfo.getStream());
+                    // 鍗曠鍙fā寮忎笅淇敼娴両D涓虹洰鏍囨祦ID锛屼笉鐒跺叾浠栧湴鏂瑰彲鑳介兘鏃犳硶瀵瑰簲
+                    param.setStream(inviteInfo.getStream());
                 }
             }
 
@@ -318,18 +303,17 @@
                     result.setEnable_audio(true);
                 }
             }
-        }
-        else if (param.getApp().equals("broadcast")) {
+        } else if (param.getApp().equals("broadcast")) {
             result.setEnable_audio(true);
-        }else if (param.getApp().equals("talk")) {
+        } else if (param.getApp().equals("talk")) {
             result.setEnable_audio(true);
         }
         if (param.getApp().equalsIgnoreCase("rtp")) {
             String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
-            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey);
+            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
 
             String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
-            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS);
+            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
             if (otherRtpSendInfo != null || otherPsSendInfo != null) {
                 result.setEnable_mp4(true);
             }
@@ -352,13 +336,10 @@
             logger.info("[ZLM HOOK] 娴佹敞閿�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
         }
 
-        JSONObject ret = new JSONObject();
-        ret.put("code", 0);
-        ret.put("msg", "success");
-        MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
         JSONObject json = (JSONObject) JSON.toJSON(param);
         taskExecutor.execute(() -> {
             ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
+            MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
             if (mediaInfo == null) {
                 logger.info("[ZLM HOOK] 娴佸彉鍖栨湭鎵惧埌ZLM, {}", param.getMediaServerId());
                 return;
@@ -384,7 +365,6 @@
                     redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
                 }
             }
-
             if ("rtsp".equals(param.getSchema())) {
                 logger.info("娴佸彉鍖栵細娉ㄥ唽->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
                 if (param.isRegist()) {
@@ -479,8 +459,7 @@
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -497,15 +476,18 @@
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {
                             if (userSetting.isUsePushingAsStatus()) {
-                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
+                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF);
                             }
                         }
                         if (type != null) {
@@ -522,12 +504,24 @@
                 }
                 if (!param.isRegist()) {
                     List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
-                    if (sendRtpItems.size() > 0) {
+                    if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
-                            if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
-                                String platformId = sendRtpItem.getPlatformId();
-                                ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
-                                Device device = deviceService.getDevice(platformId);
+                            if (sendRtpItem == null) {
+                                continue;
+                            }
+
+                            if (sendRtpItem.getApp().equals(param.getApp())) {
+                                logger.info(sendRtpItem.toString());
+                                if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+                                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
+                                            sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
+                                            sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
+                                    // 閫氱煡鍏朵粬wvp鍋滄鍙戞祦
+                                    redisCatchStorage.sendPushStreamClose(messageForPushChannel);
+                                }else {
+                                    String platformId = sendRtpItem.getPlatformId();
+                                    ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+                                    Device device = deviceService.getDevice(platformId);
 
                                     try {
                                         if (platform != null) {
@@ -551,11 +545,11 @@
                                         logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
                                     }
                                 }
+
                             }
                         }
                     }
                 }
-
             }
         });
         return HookResult.SUCCESS();
@@ -586,9 +580,9 @@
                 }
                 // 鏀跺埌鏃犱汉瑙傜湅璇存槑娴佷篃娌℃湁鍦ㄥ線涓婄骇鎺ㄩ��
                 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
-                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
+                    List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
                             inviteInfo.getChannelId());
-                    if (sendRtpItems.size() > 0) {
+                    if (!sendRtpItems.isEmpty()) {
                         for (SendRtpItem sendRtpItem : sendRtpItems) {
                             ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
                             try {
@@ -617,14 +611,14 @@
                         if (info != null) {
                             cmder.streamByeCmd(device, inviteInfo.getChannelId(),
                                     inviteInfo.getStream(), null);
-                        }else {
+                        } else {
                             logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囩殑鐐规挱淇℃伅锛� {}锛� 娴侊細{}", inviteInfo.getDeviceId(), param.getStream());
                         }
                     } catch (InvalidArgumentException | ParseException | SipException |
                              SsrcTransactionNotFoundException e) {
                         logger.error("[鏃犱汉瑙傜湅]鐐规挱锛� 鍙戦�丅YE澶辫触 {}", e.getMessage());
                     }
-                }else {
+                } else {
                     logger.info("[鏃犱汉瑙傜湅] 鏈壘鍒拌澶囷細 {}锛屾祦锛歿}", inviteInfo.getDeviceId(), param.getStream());
                 }
 
@@ -734,7 +728,7 @@
                     });
                 }
                 return result;
-            }else if(s.length == 4){
+            } else if (s.length == 4) {
                 // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
                 String startTimeStr = s[2];
                 String endTimeStr = s[3];
@@ -768,14 +762,14 @@
 
                 if (!exist) {
                     SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null,
-                            device.isSsrcCheck(),  true, 0, false, false, device.getStreamModeForParam());
+                            device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
                     playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> {
                         msg.setData(new HookResult(code, message));
                         resultHolder.invokeResult(msg);
                     });
                 }
                 return result;
-            }else {
+            } else {
                 defaultResult.setResult(HookResult.SUCCESS());
                 return defaultResult;
             }
@@ -805,7 +799,7 @@
         logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId());
         taskExecutor.execute(() -> {
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
-            if (subscribes != null && subscribes.size() > 0) {
+            if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
                     subscribe.response(null, zlmServerConfig);
                 }
@@ -854,12 +848,11 @@
      */
     @ResponseBody
     @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
-    public HookResult onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam
+    public HookResult onRtpServerTimeout(@RequestBody OnRtpServerTimeoutHookParam
             param) {
         logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
 
         taskExecutor.execute(() -> {
-            JSONObject json = (JSONObject) JSON.toJSON(param);
             List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
             if (subscribes != null && !subscribes.isEmpty()) {
                 for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {

--
Gitblit v1.8.0