From 0c2180c07fd26c04f8d49aa7c7968a09b9c06d46 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 26 三月 2024 23:36:55 +0800
Subject: [PATCH] 支持处理流没找到事件

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java       |  112 ---------------------------
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java        |   52 +++++++++++++
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java |   19 ++++
 3 files changed, 74 insertions(+), 109 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 702cc0d..71ceb2e 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -253,123 +253,17 @@
      */
     @ResponseBody
     @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8")
-    public DeferredResult<HookResult> onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) {
+    public HookResult onStreamNotFound(@RequestBody OnStreamNotFoundHookParam param) {
         logger.info("[ZLM HOOK] 娴佹湭鎵惧埌锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
 
-        DeferredResult<HookResult> defaultResult = new DeferredResult<>();
 
         MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
         if (!userSetting.isAutoApplyPlay() || mediaServer == null) {
-            defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-            return defaultResult;
+            return HookResult.SUCCESS();
         }
         MediaNotFoundEvent mediaNotFoundEvent = MediaNotFoundEvent.getInstance(this, param, mediaServer);
         applicationEventPublisher.publishEvent(mediaNotFoundEvent);
-
-
-
-        if ("rtp".equals(param.getApp())) {
-            String[] s = param.getStream().split("_");
-            if ((s.length != 2 && s.length != 4)) {
-                defaultResult.setResult(HookResult.SUCCESS());
-                return defaultResult;
-            }
-            String deviceId = s[0];
-            String channelId = s[1];
-            Device device = redisCatchStorage.getDevice(deviceId);
-            if (device == null || !device.isOnLine()) {
-                defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-                return defaultResult;
-            }
-            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
-            if (deviceChannel == null) {
-                defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
-                return defaultResult;
-            }
-            if (s.length == 2) {
-                logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
-
-                RequestMessage msg = new RequestMessage();
-                String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
-                boolean exist = resultHolder.exist(key, null);
-                msg.setKey(key);
-                String uuid = UUID.randomUUID().toString();
-                msg.setId(uuid);
-                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-
-                result.onTimeout(() -> {
-                    logger.info("[ZLM HOOK] 棰勮娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂");
-                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂"));
-                    resultHolder.invokeAllResult(msg);
-                    inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
-                    storager.stopPlay(deviceId, channelId);
-                });
-
-                resultHolder.put(key, uuid, result);
-
-                if (!exist) {
-                    playService.play(mediaServer, deviceId, channelId, null, (code, message, data) -> {
-                        msg.setData(new HookResult(code, message));
-                        resultHolder.invokeResult(msg);
-                    });
-                }
-                return result;
-            } else if (s.length == 4) {
-                // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
-                String startTimeStr = s[2];
-                String endTimeStr = s[3];
-                if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
-                    defaultResult.setResult(HookResult.SUCCESS());
-                    return defaultResult;
-                }
-                String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
-                String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
-                logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}",
-                        param.getMediaServerId(), param.getSchema(),
-                        param.getApp(), param.getStream(),
-                        startTime, endTime
-                );
-                RequestMessage msg = new RequestMessage();
-                String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
-                boolean exist = resultHolder.exist(key, null);
-                msg.setKey(key);
-                String uuid = UUID.randomUUID().toString();
-                msg.setId(uuid);
-                DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
-
-                result.onTimeout(() -> {
-                    logger.info("[ZLM HOOK] 鍥炴斁娴佽嚜鍔ㄧ偣鎾�, 绛夊緟瓒呮椂");
-                    // 閲婃斁rtpserver
-                    msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "鐐规挱瓒呮椂"));
-                    resultHolder.invokeResult(msg);
-                });
-
-                resultHolder.put(key, uuid, result);
-
-                if (!exist) {
-                    SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, param.getStream(), null,
-                            device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
-                    playService.playBack(mediaServer, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> {
-                        msg.setData(new HookResult(code, message));
-                        resultHolder.invokeResult(msg);
-                    });
-                }
-                return result;
-            } else {
-                defaultResult.setResult(HookResult.SUCCESS());
-                return defaultResult;
-            }
-
-        } else {
-            // 鎷夋祦浠g悊
-            StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
-            if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
-                streamProxyService.start(param.getApp(), param.getStream());
-            }
-            DeferredResult<HookResult> result = new DeferredResult<>();
-            result.setResult(HookResult.SUCCESS());
-            return result;
-        }
+        return HookResult.SUCCESS();
     }
 
     /**
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 722139e..31a43c9 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -13,12 +13,15 @@
 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -28,6 +31,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResult;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.*;
@@ -49,6 +53,7 @@
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.sdp.*;
 import javax.sip.InvalidArgumentException;
@@ -195,6 +200,53 @@
         }
     }
 
+    /**
+     * 娴佹湭鎵惧埌鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaNotFoundEvent event) {
+        if (!"rtp".equals(event.getApp())) {
+            return;
+        }
+        String[] s = event.getStream().split("_");
+        if ((s.length != 2 && s.length != 4)) {
+            return;
+        }
+        String deviceId = s[0];
+        String channelId = s[1];
+        Device device = redisCatchStorage.getDevice(deviceId);
+        if (device == null || !device.isOnLine()) {
+            return;
+        }
+        DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+        if (deviceChannel == null) {
+            return;
+        }
+        if (s.length == 2) {
+            logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
+            play(event.getMediaServer(), deviceId, channelId, null, null);
+        } else if (s.length == 4) {
+            // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
+            String startTimeStr = s[2];
+            String endTimeStr = s[3];
+            if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
+                return;
+            }
+            String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
+            String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
+            logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}",
+                    event.getMediaServer().getId(), event.getSchema(),
+                    event.getApp(), event.getStream(),
+                    startTime, endTime
+            );
+
+            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
+                    device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
+            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
+        }
+    }
+
 
     @Override
     public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index 02c5b30..0713ab3 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -11,6 +11,7 @@
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -18,6 +19,7 @@
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResult;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.IGbStreamService;
 import com.genersoft.iot.vmp.service.IMediaService;
@@ -44,6 +46,7 @@
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.ObjectUtils;
+import org.springframework.web.context.request.async.DeferredResult;
 
 import java.util.HashMap;
 import java.util.List;
@@ -128,6 +131,22 @@
         }
     }
 
+    /**
+     * 娴佺寮�鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaNotFoundEvent event) {
+        if ("rtp".equals(event.getApp())) {
+            return;
+        }
+        // 鎷夋祦浠g悊
+        StreamProxyItem streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
+        if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
+            start(event.getApp(), event.getStream());
+        }
+    }
+
 
     @Override
     public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {

--
Gitblit v1.8.0