From 1fc2916c2b4b28fbf722c4401e559805f9578573 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 28 四月 2024 22:25:58 +0800
Subject: [PATCH] Merge pull request #1432 from AlphaWu/Zafu-Dev-20240428

---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java |  389 ++++++++++++++++++++++++++++---------------------------
 1 files changed, 200 insertions(+), 189 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 65cf693..40de0d2 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.*;
 import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -17,17 +16,16 @@
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
-import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
-import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.bean.RecordInfo;
+import com.genersoft.iot.vmp.media.event.hook.Hook;
+import com.genersoft.iot.vmp.media.event.hook.HookType;
+import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -85,19 +83,13 @@
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
     private IInviteStreamService inviteStreamService;
 
     @Autowired
-    private ZlmHttpHookSubscribe subscribe;
+    private HookSubscribe subscribe;
 
     @Autowired
     private SendRtpPortManager sendRtpPortManager;
-
-    @Autowired
-    private IMediaService mediaService;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -172,6 +164,33 @@
     @Async("taskExecutor")
     @EventListener
     public void onApplicationEvent(MediaDepartureEvent event) {
+        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
+        if (!sendRtpItems.isEmpty()) {
+            for (SendRtpItem sendRtpItem : sendRtpItems) {
+                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
+                    String platformId = sendRtpItem.getPlatformId();
+                    Device device = deviceService.getDevice(platformId);
+                    try {
+                        if (device != null) {
+                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
+                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
+                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
+                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                if (audioBroadcastCatch != null) {
+                                    // 鏉ヨ嚜涓婄骇骞冲彴鐨勫仠姝㈠璁�
+                                    logger.info("[鍋滄瀵硅] 鏉ヨ嚜涓婄骇锛屽钩鍙帮細{}, 閫氶亾锛歿}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+                                }
+                            }
+                        }
+                    } catch (SipException | InvalidArgumentException | ParseException |
+                             SsrcTransactionNotFoundException e) {
+                        logger.error("[鍛戒护鍙戦�佸け璐 鍙戦�丅YE: {}", e.getMessage());
+                    }
+                }
+            }
+        }
+
         if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
             if (event.getStream().indexOf("_") > 0) {
                 String[] streamArray = event.getStream().split("_");
@@ -188,9 +207,55 @@
                     }else if ("talk".equals(event.getApp())) {
                         stopTalk(device, channelId, false);
                     }
-
                 }
             }
+        }
+    }
+
+    /**
+     * 娴佹湭鎵惧埌鐨勫鐞�
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaNotFoundEvent event) {
+        if (!"rtp".equals(event.getApp())) {
+            return;
+        }
+        String[] s = event.getStream().split("_");
+        if ((s.length != 2 && s.length != 4)) {
+            return;
+        }
+        String deviceId = s[0];
+        String channelId = s[1];
+        Device device = redisCatchStorage.getDevice(deviceId);
+        if (device == null || !device.isOnLine()) {
+            return;
+        }
+        DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+        if (deviceChannel == null) {
+            return;
+        }
+        if (s.length == 2) {
+            logger.info("[ZLM HOOK] 棰勮娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
+            play(event.getMediaServer(), deviceId, channelId, null, null);
+        } else if (s.length == 4) {
+            // 姝ゆ椂涓哄綍鍍忓洖鏀撅紝 褰曞儚鍥炴斁鏍煎紡涓�> 璁惧ID_閫氶亾ID_寮�濮嬫椂闂確缁撴潫鏃堕棿
+            String startTimeStr = s[2];
+            String endTimeStr = s[3];
+            if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
+                return;
+            }
+            String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
+            String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
+            logger.info("[ZLM HOOK] 鍥炴斁娴佹湭鎵惧埌, 鍙戣捣鑷姩鐐规挱锛歿}->{}->{}/{}-{}-{}",
+                    event.getMediaServer().getId(), event.getSchema(),
+                    event.getApp(), event.getStream(),
+                    startTime, endTime
+            );
+
+            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
+                    device.isSsrcCheck(), true, 0, false, !deviceChannel.isHasAudio(), false, device.getStreamModeForParam());
+            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
         }
     }
 
@@ -232,8 +297,7 @@
                 }
                 String mediaServerId = streamInfo.getMediaServerId();
                 MediaServer mediaInfo = mediaServerService.getOne(mediaServerId);
-
-                Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
+                Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
                 if (ready != null && ready) {
                     callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                     inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -251,7 +315,7 @@
             }
         }
         String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, false, device.getStreamModeForParam());
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, !channel.isHasAudio(), false, device.getStreamModeForParam());
         if (ssrcInfo == null) {
             callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
             inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -265,7 +329,7 @@
     }
 
     private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream,
-                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+                      HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                       Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
 
         String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@@ -321,38 +385,25 @@
             }
         }, userSetting.getPlayTimeout());
 
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost","__defaultVhost__");
-        param.put("app", sendRtpItem.getApp());
-        param.put("stream", sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
-        param.put("recv_stream_id", sendRtpItem.getReceiveStream());
-        param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
-
-        zlmServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
-            if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
-                logger.info("[璇煶瀵硅]澶辫触 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
-                audioEvent.call("澶辫触, " + jsonObject.getString("msg"));
-                // 鏌ョ湅鏄惁宸茬粡寤虹珛浜嗛�氶亾锛屽瓨鍦ㄥ垯鍙戦�乥ye
-                stopTalk(device, channelId);
-            }
-        });
+        try {
+            mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
+        }catch (ControllerException e) {
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
+            logger.info("[璇煶瀵硅]澶辫触 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
+            audioEvent.call("澶辫触, " + e.getMessage());
+            // 鏌ョ湅鏄惁宸茬粡寤虹珛浜嗛�氶亾锛屽瓨鍦ㄥ垯鍙戦�乥ye
+            stopTalk(device, channelId);
+        }
 
 
         // 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴�
         try {
-            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
-                logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookParam);
+            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
+                logger.info("[璇煶瀵硅] 娴佸凡鐢熸垚锛� 寮�濮嬫帹娴侊細 " + hookData);
                 dynamicTask.stop(timeOutTaskKey);
                 // TODO 鏆備笉鍋氬鐞�
-            }, (mediaServerItemInuse, hookParam) -> {
-                logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookParam);
+            }, (hookData) -> {
+                logger.info("[璇煶瀵硅] 璁惧寮�濮嬫帹娴侊細 " + hookData);
                 dynamicTask.stop(timeOutTaskKey);
 
             }, (event) -> {
@@ -462,8 +513,7 @@
                     streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                     mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                     // 鍙栨秷璁㈤槄娑堟伅鐩戝惉
-                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
-                    subscribe.removeSubscribe(hookSubscribe);
+                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
                 }
             }else {
                 logger.info("[鐐规挱瓒呮椂] 鏀舵祦瓒呮椂 deviceId: {}, channelId: {},鐮佹祦锛歿}锛岀鍙o細{}, SSRC: {}",
@@ -478,11 +528,11 @@
         }, userSetting.getPlayTimeout());
 
         try {
-            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> {
-                logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + hookParam);
+            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
+                logger.info("鏀跺埌璁㈤槄娑堟伅锛� " + hookData);
                 dynamicTask.stop(timeOutTaskKey);
                 // hook鍝嶅簲
-                StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId());
+                StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
                 if (streamInfo == null){
                     callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                             InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@@ -498,7 +548,7 @@
                         streamInfo);
                 logger.info("[鐐规挱鎴愬姛] deviceId: {}, channelId:{}, 鐮佹祦绫诲瀷锛歿}", device.getDeviceId(), channel.getChannelId(),
                         channel.getStreamIdentification());
-                snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
+                snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
             }, (eventResult) -> {
                 // 澶勭悊鏀跺埌200ok鍚庣殑TCP涓诲姩杩炴帴浠ュ強SSRC涓嶄竴鑷寸殑闂
                 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
@@ -512,8 +562,7 @@
 
                 streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
 
-                callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
-                        String.format("鐐规挱澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg), null);
+                callback.run(event.statusCode, event.msg, null);
                 inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                         InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                         String.format("鐐规挱澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg), null);
@@ -624,11 +673,10 @@
         mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
     }
 
-    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
+    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
         StreamInfo streamInfo = null;
         Device device = redisCatchStorage.getDevice(deviceId);
-        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
-        streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
+        streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
         if (streamInfo != null) {
             DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
             if (deviceChannel != null) {
@@ -646,9 +694,8 @@
 
     }
 
-    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
-        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
-        StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
+    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
+        StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
         if (streamInfo != null) {
             streamInfo.setStartTime(startTime);
             streamInfo.setEndTime(endTime);
@@ -657,7 +704,7 @@
                 deviceChannel.setStreamId(streamInfo.getStream());
                 storager.startPlay(deviceId, channelId, streamInfo.getStream());
             }
-            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream());
+            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream());
             if (inviteInfo != null) {
                 inviteInfo.setStatus(InviteSessionStatus.ok);
 
@@ -695,6 +742,12 @@
             throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒拌澶囷細" + deviceId);
         }
 
+        DeviceChannel channel = channelService.getOne(deviceId, channelId);
+        if (channel == null) {
+            logger.warn("[褰曞儚鍥炴斁] 鏈壘鍒伴�氶亾 deviceId: {},channelId:{}", deviceId, channelId);
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈壘鍒伴�氶亾锛�" + channelId);
+        }
+
         MediaServer newMediaServerItem = getNewMediaServerItem(device);
         if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) {
             logger.warn("[褰曞儚鍥炴斁] 鍗曠鍙f敹娴佹椂涓嶆敮鎸乀CP涓诲姩鏂瑰紡鏀舵祦 deviceId: {},channelId:{}", deviceId, channelId);
@@ -707,7 +760,7 @@
                 .replace(":", "")
                 .replace(" ", "");
         String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  false, device.getStreamModeForParam());
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  !channel.isHasAudio(),  false, device.getStreamModeForParam());
         playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
     }
 
@@ -763,10 +816,10 @@
             inviteStreamService.removeInviteInfo(inviteInfo);
         };
 
-        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
-            logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookParam);
+        HookSubscribe.Event hookEvent = (hookData) -> {
+            logger.info("鏀跺埌鍥炴斁璁㈤槄娑堟伅锛� " + hookData);
             dynamicTask.stop(playBackTimeOutTaskKey);
-            StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
+            StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
             if (streamInfo == null) {
                 logger.warn("璁惧鍥炴斁API璋冪敤澶辫触锛�");
                 callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -892,6 +945,10 @@
         if (device == null) {
             return;
         }
+        DeviceChannel channel = channelService.getOne(deviceId, channelId);
+        if (channel == null) {
+            return;
+        }
         MediaServer newMediaServerItem = this.getNewMediaServerItem(device);
         if (newMediaServerItem == null) {
             callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
@@ -900,7 +957,7 @@
             return;
         }
         // 褰曞儚涓嬭浇涓嶄娇鐢ㄥ浐瀹氭祦鍦板潃锛屽浐瀹氭祦鍦板潃浼氬鑷村鏋滃紑濮嬫椂闂翠笌缁撴潫鏃堕棿涓�鑷存椂鏂囦欢閿欒鐨勫彔鍔犲湪涓�璧�
-        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
+        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam());
         download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
     }
 
@@ -952,10 +1009,10 @@
             streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
             inviteStreamService.removeInviteInfo(inviteInfo);
         };
-        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
-            logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookParam);
+        HookSubscribe.Event hookEvent = (hookData) -> {
+            logger.info("[褰曞儚涓嬭浇]鏀跺埌璁㈤槄娑堟伅锛� " + hookData);
             dynamicTask.stop(downLoadTimeOutTaskKey);
-            StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
+            StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
             if (streamInfo == null) {
                 logger.warn("[褰曞儚涓嬭浇] 鑾峰彇娴佸湴鍧�淇℃伅澶辫触");
                 callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -973,26 +1030,22 @@
                                 downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
 
                         // 娉ㄥ唽褰曞儚鍥炶皟浜嬩欢锛屽綍鍍忎笅杞界粨鏉熷悗鍐欏叆涓嬭浇鍦板潃
-                        ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> {
+                        HookSubscribe.Event hookEventForRecord = (hookData) -> {
                             logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅锛� 锛� {}/{}-{}",
                                     inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
-                            logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅鍐呭锛� " + hookParam);
-                            OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
-                            String filePath = recordMp4HookParam.getFile_path();
+                            logger.info("[褰曞儚涓嬭浇] 鏀跺埌褰曞儚鍐欏叆纾佺洏娑堟伅鍐呭锛� " + hookData);
+                            RecordInfo recordInfo = hookData.getRecordInfo();
+                            String filePath = recordInfo.getFilePath();
                             DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
                             InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                     , inviteInfo.getChannelId(), inviteInfo.getStream());
                             inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                             inviteStreamService.updateInviteInfo(inviteInfoForNew);
                         };
-                        HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(
-                                mediaServerItem.getId(), "rtp", ssrcInfo.getStream());
-
+                        Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                         // 璁剧疆杩囨湡鏃堕棿锛屼笅杞藉け璐ユ椂鑷姩澶勭悊璁㈤槄鏁版嵁
-//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
-//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
-//                        hookSubscribe.setExpires(expiresInstant);
-                        subscribe.addSubscribe(hookSubscribe, hookEventForRecord);
+                        hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
+                        subscribe.addSubscribe(hook, hookEventForRecord);
                     });
         } catch (InvalidArgumentException | SipException | ParseException e) {
             logger.error("[鍛戒护鍙戦�佸け璐 褰曞儚涓嬭浇: {}", e.getMessage());
@@ -1058,9 +1111,8 @@
         return inviteInfo.getStreamInfo();
     }
 
-    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
-        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
-        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
+    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
+        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId);
         if (streamInfo != null) {
             streamInfo.setProgress(0);
             streamInfo.setStartTime(startTime);
@@ -1077,9 +1129,8 @@
     }
 
 
-    public StreamInfo onPublishHandler(MediaServer mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
-        MediaInfo mediaInfo = MediaInfo.getInstance(hookParam);
-        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), mediaInfo, null);
+    public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
+        StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
         streamInfo.setDeviceID(deviceId);
         streamInfo.setChannelId(channelId);
         return streamInfo;
@@ -1144,7 +1195,7 @@
         AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
         audioBroadcastResult.setApp(app);
         audioBroadcastResult.setStream(stream);
-        audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
+        audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
         audioBroadcastResult.setCodec("G.711");
         return audioBroadcastResult;
     }
@@ -1166,7 +1217,7 @@
             SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("璇煶骞挎挱宸茬粡寮�鍚細 {}", channelId);
                     event.call("璇煶骞挎挱宸茬粡寮�鍚�");
@@ -1176,18 +1227,6 @@
                 }
             }
         }
-//        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
-//        if (sendRtpItem != null) {
-//            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-//            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
-//            if (streamReady) {
-//                logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId);
-//                event.call("璇煶瀵硅杩涜涓�");
-//                return false;
-//            } else {
-//                stopTalk(device, channelId);
-//            }
-//        }
 
         // 鍙戦�侀�氱煡
         cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@@ -1202,7 +1241,7 @@
             dynamicTask.startDelay(key, ()->{
                 logger.info("[璇煶骞挎挱]绛夊緟invite娑堟伅瓒呮椂锛歿}/{}", device.getDeviceId(), channelId);
                 stopAudioBroadcast(device.getDeviceId(), channelId);
-            }, 2000);
+            }, 10*1000);
         }, eventResultForError -> {
             // 鍙戦�佸け璐�
             logger.error("璇煶骞挎挱鍙戦�佸け璐ワ細 {}:{}", channelId, eventResultForError.msg);
@@ -1219,7 +1258,7 @@
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
                 MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("璇煶骞挎挱閫氶亾浣跨敤涓細 {}", channelId);
                     return true;
@@ -1375,100 +1414,55 @@
     @Override
     public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
         // 寮�濮嬪彂娴�
-        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
         MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-        logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(),
-                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost", "__defaultVhost__");
-        param.put("app", sendRtpItem.getApp());
-        param.put("stream", sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", is_Udp);
-        if (!sendRtpItem.isTcp()) {
-            // udp妯″紡涓嬪紑鍚痳tcp淇濇椿
-            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
-        }
 
         if (mediaInfo == null) {
-            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
+            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
+                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
             });
         } else {
-            // 濡傛灉鏄弗鏍兼ā寮忥紝闇�瑕佸叧闂鍙e崰鐢�
-            JSONObject startSendRtpStreamResult = null;
-            if (sendRtpItem.getLocalPort() != 0) {
+            try {
                 if (sendRtpItem.isTcpActive()) {
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                 } else {
-                    param.put("dst_url", sendRtpItem.getIp());
-                    param.put("dst_port", sendRtpItem.getPort());
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                 }
-            } else {
-                if (sendRtpItem.isTcpActive()) {
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-                } else {
-                    param.put("dst_url", sendRtpItem.getIp());
-                    param.put("dst_port", sendRtpItem.getPort());
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-                }
+            }catch (ControllerException e) {
+                logger.error("RTP鎺ㄦ祦澶辫触: {}", e.getMessage());
+                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
+                return;
             }
-            if (startSendRtpStreamResult != null) {
-                startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
-            }
+
+            logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
+                    sendRtpItem.isTcpActive()?"琚姩鍙戞祦": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
+
         }
     }
 
     @Override
-    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
-                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
-        if (jsonObject == null) {
-            logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟");
-        } else if (jsonObject.getInteger("code") == 0) {
-            logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject);
-            logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
-                    sendRtpItem.isTcpActive()?"琚姩鍙戞祦": param.get("dst_url") + ":" + param.get("dst_port"));
-            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) {
-                ParentPlatform platform = (ParentPlatform)correlationInfo;
-                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
-                        sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
-                        sendRtpItem.getMediaServerId());
-                messageForPushChannel.setPlatFormIndex(platform.getId());
-                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+    public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, ParentPlatform platform, CallIdHeader callIdHeader) {
+        if (sendRtpItem.isOnlyAudio()) {
+            Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+            if (audioBroadcastCatch != null) {
+                try {
+                    cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
+                } catch (SipException | ParseException | InvalidArgumentException |
+                         SsrcTransactionNotFoundException exception) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍋滄璇煶瀵硅: {}", exception.getMessage());
+                }
             }
         } else {
-            logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
-            if (sendRtpItem.isOnlyAudio()) {
-                Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
-                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                if (audioBroadcastCatch != null) {
-                    try {
-                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
-                    } catch (SipException | ParseException | InvalidArgumentException |
-                             SsrcTransactionNotFoundException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍋滄璇煶瀵硅: {}", e.getMessage());
-                    }
-                }
-            } else {
+            if (platform != null) {
                 // 鍚戜笂绾у钩鍙�
-                if (correlationInfo instanceof ParentPlatform) {
-                    try {
-                        ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
-                        commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
-                    } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
-                    }
+                try {
+                    commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
                 }
             }
+
         }
     }
 
@@ -1491,7 +1485,7 @@
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
                 MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("[璇煶瀵硅] 姝e湪璇煶骞挎挱锛屾棤娉曞紑鍚闊抽�氳瘽锛� {}", channelId);
                     event.call("姝e湪璇煶骞挎挱");
@@ -1505,7 +1499,7 @@
         SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
         if (sendRtpItem != null) {
             MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
+            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
             if (streamReady) {
                 logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId);
                 event.call("璇煶瀵硅杩涜涓�");
@@ -1515,7 +1509,7 @@
             }
         }
 
-        talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
+        talk(mediaServerItem, device, channelId, stream, (hookData) -> {
             logger.info("[璇煶瀵硅] 鏀跺埌璁惧鍙戞潵鐨勬祦");
         }, eventResult -> {
             logger.warn("[璇煶瀵硅] 澶辫触锛寋}/{}, 閿欒鐮� {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
@@ -1552,12 +1546,7 @@
         MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
 
         if (streamIsReady == null || streamIsReady) {
-            Map<String, Object> param = new HashMap<>();
-            param.put("vhost", "__defaultVhost__");
-            param.put("app", sendRtpItem.getApp());
-            param.put("stream", sendRtpItem.getStream());
-            param.put("ssrc", sendRtpItem.getSsrc());
-            zlmServerFactory.stopSendRtpStream(mediaServer, param);
+            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
         }
 
         ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
@@ -1620,4 +1609,26 @@
         });
     }
 
+    @Override
+    public void stopPlay(Device device, String channelId) {
+        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        if (inviteInfo == null) {
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "鐐规挱鏈壘鍒�");
+        }
+        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
+            try {
+                logger.info("[鍋滄鐐规挱] {}/{}", device.getDeviceId(), channelId);
+                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
+            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+                logger.error("[鍛戒护鍙戦�佸け璐 鍋滄鐐规挱锛� 鍙戦�丅YE: {}", e.getMessage());
+                throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍛戒护鍙戦�佸け璐�: " + e.getMessage());
+            }
+        }
+        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        storager.stopPlay(device.getDeviceId(), channelId);
+        channelService.stopPlay(device.getDeviceId(), channelId);
+        if (inviteInfo.getStreamInfo() != null) {
+            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
+        }
+    }
 }

--
Gitblit v1.8.0