From 1fc2916c2b4b28fbf722c4401e559805f9578573 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 28 四月 2024 22:25:58 +0800
Subject: [PATCH] Merge pull request #1432 from AlphaWu/Zafu-Dev-20240428
---
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java |  204 +++++++++++++++++---------------------------------
 1 files changed, 71 insertions(+), 133 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 5c6bca2..40de0d2 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.dynamic.datasource.annotation.DS;
 import com.genersoft.iot.vmp.common.*;
 import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -25,7 +24,6 @@
 import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.service.*;
@@ -83,9 +81,6 @@
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private IInviteStreamService inviteStreamService;
@@ -302,8 +297,7 @@
                 }
                 String mediaServerId = streamInfo.getMediaServerId();
                 MediaServer mediaInfo = mediaServerService.getOne(mediaServerId);
-
-                Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
+                Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
                 if (ready != null && ready) {
                     callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                     inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -391,28 +385,15 @@
             }
         }, userSetting.getPlayTimeout());
 
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost","__defaultVhost__");
-        param.put("app", sendRtpItem.getApp());
-        param.put("stream", sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
-        param.put("recv_stream_id", sendRtpItem.getReceiveStream());
-        param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
-
-        zlmServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
-            if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
-                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
-                logger.info("[璇煶瀵硅]澶辫触 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
-                audioEvent.call("澶辫触, " + jsonObject.getString("msg"));
-                // 鏌ョ湅鏄惁宸茬粡寤虹珛浜嗛�氶亾锛屽瓨鍦ㄥ垯鍙戦�乥ye
-                stopTalk(device, channelId);
-            }
-        });
+        try {
+            mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
+        }catch (ControllerException e) {
+            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
+            logger.info("[璇煶瀵硅]澶辫触 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
+            audioEvent.call("澶辫触, " + e.getMessage());
+            // 鏌ョ湅鏄惁宸茬粡寤虹珛浜嗛�氶亾锛屽瓨鍦ㄥ垯鍙戦�乥ye
+            stopTalk(device, channelId);
+        }
 
 
         // 鏌ョ湅璁惧鏄惁宸茬粡鍦ㄦ帹娴�
@@ -581,8 +562,7 @@
 
                 streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
 
-                callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
-                        String.format("鐐规挱澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg), null);
+                callback.run(event.statusCode, event.msg, null);
                 inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                         InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                         String.format("鐐规挱澶辫触锛� 閿欒鐮侊細 %s, %s", event.statusCode, event.msg), null);
@@ -1064,9 +1044,7 @@
                         };
                         Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                         // 璁剧疆杩囨湡鏃堕棿锛屼笅杞藉け璐ユ椂鑷姩澶勭悊璁㈤槄鏁版嵁
-//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
-//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
-//                        hookSubscribe.setExpires(expiresInstant);
+                        hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
                         subscribe.addSubscribe(hook, hookEventForRecord);
                     });
         } catch (InvalidArgumentException | SipException | ParseException e) {
@@ -1239,7 +1217,7 @@
             SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("璇煶骞挎挱宸茬粡寮�鍚細 {}", channelId);
                     event.call("璇煶骞挎挱宸茬粡寮�鍚�");
@@ -1249,18 +1227,6 @@
                 }
             }
         }
-//        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
-//        if (sendRtpItem != null) {
-//            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-//            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
-//            if (streamReady) {
-//                logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId);
-//                event.call("璇煶瀵硅杩涜涓�");
-//                return false;
-//            } else {
-//                stopTalk(device, channelId);
-//            }
-//        }
 
         // 鍙戦�侀�氱煡
         cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@@ -1275,7 +1241,7 @@
             dynamicTask.startDelay(key, ()->{
                 logger.info("[璇煶骞挎挱]绛夊緟invite娑堟伅瓒呮椂锛歿}/{}", device.getDeviceId(), channelId);
                 stopAudioBroadcast(device.getDeviceId(), channelId);
-            }, 2000);
+            }, 10*1000);
         }, eventResultForError -> {
             // 鍙戦�佸け璐�
             logger.error("璇煶骞挎挱鍙戦�佸け璐ワ細 {}:{}", channelId, eventResultForError.msg);
@@ -1292,7 +1258,7 @@
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
                 MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("璇煶骞挎挱閫氶亾浣跨敤涓細 {}", channelId);
                     return true;
@@ -1448,100 +1414,55 @@
     @Override
     public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
         // 寮�濮嬪彂娴�
-        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
         MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-        logger.info("[寮�濮嬫帹娴乚 rtp/{}, 鐩爣={}:{}锛孲SRC={}, RTCP={}", sendRtpItem.getStream(),
-                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost", "__defaultVhost__");
-        param.put("app", sendRtpItem.getApp());
-        param.put("stream", sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", is_Udp);
-        if (!sendRtpItem.isTcp()) {
-            // udp妯″紡涓嬪紑鍚痳tcp淇濇椿
-            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
-        }
 
         if (mediaInfo == null) {
-            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
+            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
+                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
             });
         } else {
-            // 濡傛灉鏄弗鏍兼ā寮忥紝闇�瑕佸叧闂鍙e崰鐢�
-            JSONObject startSendRtpStreamResult = null;
-            if (sendRtpItem.getLocalPort() != 0) {
+            try {
                 if (sendRtpItem.isTcpActive()) {
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                 } else {
-                    param.put("dst_url", sendRtpItem.getIp());
-                    param.put("dst_port", sendRtpItem.getPort());
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                 }
-            } else {
-                if (sendRtpItem.isTcpActive()) {
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-                } else {
-                    param.put("dst_url", sendRtpItem.getIp());
-                    param.put("dst_port", sendRtpItem.getPort());
-                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-                }
+            }catch (ControllerException e) {
+                logger.error("RTP鎺ㄦ祦澶辫触: {}", e.getMessage());
+                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
+                return;
             }
-            if (startSendRtpStreamResult != null) {
-                startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
-            }
+
+            logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
+                    sendRtpItem.isTcpActive()?"琚姩鍙戞祦": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
+
         }
     }
 
     @Override
-    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
-                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
-        if (jsonObject == null) {
-            logger.error("RTP鎺ㄦ祦澶辫触: 璇锋鏌LM鏈嶅姟");
-        } else if (jsonObject.getInteger("code") == 0) {
-            logger.info("璋冪敤ZLM鎺ㄦ祦鎺ュ彛, 缁撴灉锛� {}", jsonObject);
-            logger.info("RTP鎺ㄦ祦鎴愬姛[ {}/{} ]锛寋}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
-                    sendRtpItem.isTcpActive()?"琚姩鍙戞祦": param.get("dst_url") + ":" + param.get("dst_port"));
-            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) {
-                ParentPlatform platform = (ParentPlatform)correlationInfo;
-                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
-                        sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
-                        sendRtpItem.getMediaServerId());
-                messageForPushChannel.setPlatFormIndex(platform.getId());
-                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+    public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, ParentPlatform platform, CallIdHeader callIdHeader) {
+        if (sendRtpItem.isOnlyAudio()) {
+            Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+            if (audioBroadcastCatch != null) {
+                try {
+                    cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
+                } catch (SipException | ParseException | InvalidArgumentException |
+                         SsrcTransactionNotFoundException exception) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍋滄璇煶瀵硅: {}", exception.getMessage());
+                }
             }
         } else {
-            logger.error("RTP鎺ㄦ祦澶辫触: {}, 鍙傛暟锛歿}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
-            if (sendRtpItem.isOnlyAudio()) {
-                Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
-                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
-                if (audioBroadcastCatch != null) {
-                    try {
-                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
-                    } catch (SipException | ParseException | InvalidArgumentException |
-                             SsrcTransactionNotFoundException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍋滄璇煶瀵硅: {}", e.getMessage());
-                    }
-                }
-            } else {
+            if (platform != null) {
                 // 鍚戜笂绾у钩鍙�
-                if (correlationInfo instanceof ParentPlatform) {
-                    try {
-                        ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
-                        commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
-                    } catch (SipException | InvalidArgumentException | ParseException e) {
-                        logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
-                    }
+                try {
+                    commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
+                } catch (SipException | InvalidArgumentException | ParseException e) {
+                    logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage());
                 }
             }
+
         }
     }
 
@@ -1564,7 +1485,7 @@
             if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                 // 鏌ヨ娴佹槸鍚﹀瓨鍦紝涓嶅瓨鍦ㄥ垯璁や负鏄紓甯哥姸鎬�
                 MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
+                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                 if (streamReady) {
                     logger.warn("[璇煶瀵硅] 姝e湪璇煶骞挎挱锛屾棤娉曞紑鍚闊抽�氳瘽锛� {}", channelId);
                     event.call("姝e湪璇煶骞挎挱");
@@ -1578,7 +1499,7 @@
         SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
         if (sendRtpItem != null) {
             MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
+            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
             if (streamReady) {
                 logger.warn("[璇煶瀵硅] 杩涜涓細 {}", channelId);
                 event.call("璇煶瀵硅杩涜涓�");
@@ -1625,12 +1546,7 @@
         MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
 
         if (streamIsReady == null || streamIsReady) {
-            Map<String, Object> param = new HashMap<>();
-            param.put("vhost", "__defaultVhost__");
-            param.put("app", sendRtpItem.getApp());
-            param.put("stream", sendRtpItem.getStream());
-            param.put("ssrc", sendRtpItem.getSsrc());
-            zlmServerFactory.stopSendRtpStream(mediaServer, param);
+            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
         }
 
         ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
@@ -1693,4 +1609,26 @@
         });
     }
 
+    @Override
+    public void stopPlay(Device device, String channelId) {
+        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        if (inviteInfo == null) {
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "鐐规挱鏈壘鍒�");
+        }
+        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
+            try {
+                logger.info("[鍋滄鐐规挱] {}/{}", device.getDeviceId(), channelId);
+                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
+            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
+                logger.error("[鍛戒护鍙戦�佸け璐 鍋滄鐐规挱锛� 鍙戦�丅YE: {}", e.getMessage());
+                throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍛戒护鍙戦�佸け璐�: " + e.getMessage());
+            }
+        }
+        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+        storager.stopPlay(device.getDeviceId(), channelId);
+        channelService.stopPlay(device.getDeviceId(), channelId);
+        if (inviteInfo.getStreamInfo() != null) {
+            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
+        }
+    }
 }
--
Gitblit v1.8.0