From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 29 五月 2024 15:02:51 +0800
Subject: [PATCH] 调整上级观看消息的发送

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java |  300 ++++++++++++++++-------------------------------------------
 1 files changed, 84 insertions(+), 216 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 702cc0d..738d3f8 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -2,38 +2,29 @@
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.InviteInfo;
-import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
-import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
-import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
-import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
-import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
-import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.event.media.*;
+import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerKeepaliveEvent;
 import com.genersoft.iot.vmp.media.zlm.event.HookZlmServerStartEvent;
 import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.utils.MediaServerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,16 +34,9 @@
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.util.ObjectUtils;
 import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 /**
  * @description:閽堝 ZLMediaServer鐨刪ook浜嬩欢鐩戝惉
@@ -83,6 +67,10 @@
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+
+    @Autowired
+    private IRedisRpcService redisRpcService;
+
     @Autowired
     private IInviteStreamService inviteStreamService;
 
@@ -105,10 +93,7 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    @Autowired
-    private ZlmHttpHookSubscribe subscribe;
+    private HookSubscribe subscribe;
 
     @Autowired
     private UserSetting userSetting;
@@ -135,6 +120,9 @@
     @Autowired
     private ApplicationEventPublisher applicationEventPublisher;
 
+    @Autowired
+    private IStreamPushService streamPushService;
+
     /**
      * 鏈嶅姟鍣ㄥ畾鏃朵笂鎶ユ椂闂达紝涓婃姤闂撮殧鍙厤缃紝榛樿10s涓婃姤涓�娆�
      */
@@ -160,16 +148,15 @@
     @ResponseBody
     @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
     public HookResult onPlay(@RequestBody OnPlayHookParam param) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("[ZLM HOOK] 鎾斁閴存潈锛歿}->{}", param.getMediaServerId(), param);
-        }
-        Map<String, String> paramMap = urlParamToMap(param.getParams());
+
+        Map<String, String> paramMap = MediaServerUtils.urlParamToMap(param.getParams());
         // 瀵逛簬鎾斁娴佽繘琛岄壌鏉�
         boolean authenticateResult = mediaService.authenticatePlay(param.getApp(), param.getStream(), paramMap.get("callId"));
         if (!authenticateResult) {
+            logger.info("[ZLM HOOK] 鎾斁閴存潈 澶辫触锛歿}->{}", param.getMediaServerId(), param);
             return new HookResult(401, "Unauthorized");
         }
-
+        logger.info("[ZLM HOOK] 鎾斁閴存潈鎴愬姛锛歿}->{}", param.getMediaServerId(), param);
         return HookResult.SUCCESS();
     }
 
@@ -188,15 +175,8 @@
         String mediaServerId = json.getString("mediaServerId");
         MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
         if (mediaServer == null) {
-            return new HookResultForOnPublish(200, "success");
+            return new HookResultForOnPublish(0, "success");
         }
-
-        taskExecutor.execute(() -> {
-            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
-            if (subscribe != null) {
-                subscribe.response(mediaServer, param);
-            }
-        });
 
         ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
         if (resultForOnPublish != null) {
@@ -219,16 +199,26 @@
     public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
 
         MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
-
-        if (param.isRegist()) {
-            logger.info("[ZLM HOOK] 娴佹敞鍐�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
-            MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer);
-            applicationEventPublisher.publishEvent(mediaArrivalEvent);
-        } else {
-            logger.info("[ZLM HOOK] 娴佹敞閿�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
-            MediaDepartureEvent mediaArrivalEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
-            applicationEventPublisher.publishEvent(mediaArrivalEvent);
+        if (mediaServer == null) {
+            return HookResult.SUCCESS();
         }
+        if (!ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix())
+                && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())
+                && param.getStream().endsWith(mediaServer.getTranscodeSuffix())  ) {
+            return HookResult.SUCCESS();
+        }
+        if (param.getSchema().equalsIgnoreCase("rtsp")) {
+            if (param.isRegist()) {
+                logger.info("[ZLM HOOK] 娴佹敞鍐�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+                MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer);
+                applicationEventPublisher.publishEvent(mediaArrivalEvent);
+            } else {
+                logger.info("[ZLM HOOK] 娴佹敞閿�, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+                MediaDepartureEvent mediaDepartureEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
+                applicationEventPublisher.publishEvent(mediaDepartureEvent);
+            }
+        }
+
         return HookResult.SUCCESS();
     }
 
@@ -241,10 +231,23 @@
 
         logger.info("[ZLM HOOK]娴佹棤浜鸿鐪嬶細{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
                 param.getApp(), param.getStream());
-        JSONObject ret = new JSONObject();
 
+        MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId());
+        if (mediaInfo == null) {
+            JSONObject ret = new JSONObject();
+            ret.put("code", 0);
+            return ret;
+        }
+        if (!ObjectUtils.isEmpty(mediaInfo.getTranscodeSuffix())
+                && !"null".equalsIgnoreCase(mediaInfo.getTranscodeSuffix())
+                && param.getStream().endsWith(mediaInfo.getTranscodeSuffix())  ) {
+            param.setStream(param.getStream().substring(0, param.getStream().lastIndexOf(mediaInfo.getTranscodeSuffix()) -1 ));
+        }
+
+        JSONObject ret = new JSONObject();
         boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema());
-        ret.put("code", close);
+        ret.put("code", 0);
+        ret.put("close", close);
         return ret;
     }
 
@@ -253,123 +256,17 @@
      */
     @ResponseBody
     @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8")
-    public DeferredResult<HookResult> onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) {
+    public HookResult onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) {
         logger.info("[ZLM HOOK] 娴佹湭鎵惧埌锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
 
-        DeferredResult<HookResult> defaultResult = new DeferredResult<>();
 
         MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
         if (!userSetting.isAutoApplyPlay() || mediaServer == null) {
-            defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-            return defaultResult;
+            return HookResult.SUCCESS();
         }
         MediaNotFoundEvent mediaNotFoundEvent = MediaNotFoundEvent.getInstance(this, param, mediaServer);
         applicationEventPublisher.publishEvent(mediaNotFoundEvent);
-
-
-
-        if ("rtp".equals(param.getApp())) {
-            String[] s = param.getStream().split("_");
-            if ((s.length != 2 && s.length != 4)) {
-                defaultResult.setResult(HookResult.SUCCESS());
-                return defaultResult;
-            }
-            String deviceId = s[0];
-            String channelId = s[1];
-            Device device = redisCatchStorage.getDevice(deviceId);
-            if (device == null || !device.isOnLine()) {
-                defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-                return defaultResult;
-            }
-            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
-            if (deviceChannel == null) {
-                defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-                return defaultResult;
-            }
-            if (s.length == 2) {
-                logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
-
-                RequestMessage msg = new RequestMessage();
-                String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
-                boolean exist = resultHolder.exist(key, null);
-                msg.setKey(key);
-                String uuid = UUID.randomUUID().toString();
-                msg.setId(uuid);
-                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-
-                result.onTimeout(() -> {
-                    logger.info("[ZLM HOOK] 棰勮娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂");
-                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂"));
-                    resultHolder.invokeAllResult(msg);
-                    inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
-                    storager.stopPlay(deviceId, channelId);
-                });
-
-                resultHolder.put(key, uuid, result);
-
-                if (!exist) {
-                    playService.play(mediaServer, deviceId, channelId, null, (code, message, data) -> {
-                        msg.setData(new HookResult(code, message));
-                        resultHolder.invokeResult(msg);
-                    });
-                }
-                return result;
-            } else if (s.length == 4) {
-                // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
-                String startTimeStr = s[2];
-                String endTimeStr = s[3];
-                if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
-                    defaultResult.setResult(HookResult.SUCCESS());
-                    return defaultResult;
-                }
-                String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
-                String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
-                logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}",
-                        param.getMediaServerId(), param.getSchema(),
-                        param.getApp(), param.getStream(),
-                        startTime, endTime
-                );
-                RequestMessage msg = new RequestMessage();
-                String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
-                boolean exist = resultHolder.exist(key, null);
-                msg.setKey(key);
-                String uuid = UUID.randomUUID().toString();
-                msg.setId(uuid);
-                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-
-                result.onTimeout(() -> {
-                    logger.info("[ZLM HOOK] 鍥炴斁娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂");
-                    // 閲婃斁rtpserver
-                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂"));
-                    resultHolder.invokeResult(msg);
-                });
-
-                resultHolder.put(key, uuid, result);
-
-                if (!exist) {
-                    SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, param.getStream(), null,
-                            device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
-                    playService.playBack(mediaServer, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> {
-                        msg.setData(new HookResult(code, message));
-                        resultHolder.invokeResult(msg);
-                    });
-                }
-                return result;
-            } else {
-                defaultResult.setResult(HookResult.SUCCESS());
-                return defaultResult;
-            }
-
-        } else {
-            // 鎷夋祦浠g悊
-            StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
-            if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
-                streamProxyService.start(param.getApp(), param.getStream());
-            }
-            DeferredResult<HookResult> result = new DeferredResult<>();
-            result.setResult(HookResult.SUCCESS());
-            return result;
-        }
+        return HookResult.SUCCESS();
     }
 
     /**
@@ -383,14 +280,6 @@
         ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject);
         zlmServerConfig.setIp(request.getRemoteAddr());
         logger.info("[ZLM HOOK] zlm 鍚姩 " + zlmServerConfig.getGeneralMediaServerId());
-        taskExecutor.execute(() -> {
-            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
-            if (subscribes != null && !subscribes.isEmpty()) {
-                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
-                    subscribe.response(null, zlmServerConfig);
-                }
-            }
-        });
         try {
             HookZlmServerStartEvent event = new HookZlmServerStartEvent(this);
             MediaServer mediaServerItem = mediaServerService.getOne(zlmServerConfig.getMediaServerId());
@@ -418,22 +307,16 @@
         if (!"rtp".equals(param.getApp())) {
             return HookResult.SUCCESS();
         }
-        taskExecutor.execute(() -> {
-            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
-            if (sendRtpItems.size() > 0) {
-                for (SendRtpItem sendRtpItem : sendRtpItems) {
-                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                    ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
-                    try {
-                        commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
-                    } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
-                    }
-                    redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
-                            sendRtpItem.getCallId(), sendRtpItem.getStream());
-                }
+        try {
+            MediaSendRtpStoppedEvent event = new MediaSendRtpStoppedEvent(this);
+            MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
+            if (mediaServerItem != null) {
+                event.setMediaServer(mediaServerItem);
+                applicationEventPublisher.publishEvent(event);
             }
-        });
+        }catch (Exception e) {
+            logger.info("[ZLM-HOOK-rtp鍙戦�佸叧闂璢 鍙戦�侀�氱煡澶辫触 ", e);
+        }
 
         return HookResult.SUCCESS();
     }
@@ -447,14 +330,17 @@
             param) {
         logger.info("[ZLM HOOK] rtpServer鏀舵祦瓒呮椂锛歿}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
 
-        taskExecutor.execute(() -> {
-            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
-            if (subscribes != null && !subscribes.isEmpty()) {
-                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
-                    subscribe.response(null, param);
-                }
+        try {
+            MediaRtpServerTimeoutEvent event = new MediaRtpServerTimeoutEvent(this);
+            MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
+            if (mediaServerItem != null) {
+                event.setMediaServer(mediaServerItem);
+                event.setApp("rtp");
+                applicationEventPublisher.publishEvent(event);
             }
-        });
+        }catch (Exception e) {
+            logger.info("[ZLM-HOOK-rtpServer鏀舵祦瓒呮椂] 鍙戦�侀�氱煡澶辫触 ", e);
+        }
 
         return HookResult.SUCCESS();
     }
@@ -467,35 +353,17 @@
     public HookResult onRecordMp4(HttpServletRequest request, @RequestBody OnRecordMp4HookParam param) {
         logger.info("[ZLM HOOK] 褰曞儚瀹屾垚浜嬩欢锛歿}->{}", param.getMediaServerId(), param.getFile_path());
 
-        taskExecutor.execute(() -> {
-            List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_record_mp4);
-            if (subscribes != null && !subscribes.isEmpty()) {
-                for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
-                    subscribe.response(null, param);
-                }
+        try {
+            MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
+            if (mediaServerItem != null) {
+                MediaRecordMp4Event event = MediaRecordMp4Event.getInstance(this, param, mediaServerItem);
+                event.setMediaServer(mediaServerItem);
+                applicationEventPublisher.publishEvent(event);
             }
-            cloudRecordService.addRecord(param);
-
-        });
+        }catch (Exception e) {
+            logger.info("[ZLM-HOOK-rtpServer鏀舵祦瓒呮椂] 鍙戦�侀�氱煡澶辫触 ", e);
+        }
 
         return HookResult.SUCCESS();
-    }
-
-    private Map<String, String> urlParamToMap(String params) {
-        HashMap<String, String> map = new HashMap<>();
-        if (ObjectUtils.isEmpty(params)) {
-            return map;
-        }
-        String[] paramsArray = params.split("&");
-        if (paramsArray.length == 0) {
-            return map;
-        }
-        for (String param : paramsArray) {
-            String[] paramArray = param.split("=");
-            if (paramArray.length == 2) {
-                map.put(paramArray[0], paramArray[1]);
-            }
-        }
-        return map;
     }
 }

--
Gitblit v1.8.0