From b4168c02cba462571dd3f5bdc1d0b1ffddbc938a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 16 四月 2024 00:10:38 +0800
Subject: [PATCH] 优化多wvp国标级联推流

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java |   77 +++++++++++++++++++++++++++++---------
 1 files changed, 59 insertions(+), 18 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
index 14a96e8..25dd334 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
@@ -1,12 +1,16 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,6 +22,8 @@
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -32,10 +38,10 @@
     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Autowired
-    private UserSetting userSetting;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
-    private ZlmHttpHookSubscribe hookSubscribe;
+    private IMediaServerService mediaServerService;
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -52,23 +58,14 @@
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
                     try {
-                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
-                        if (messageForPushChannel == null
-                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
-                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
-                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
-                            continue;
+                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
+                        sendRtpItem.getMediaServerId();
+                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                        if (mediaServer == null) {
+                            return;
                         }
-
-                        // 鐩戝惉娴佷笂绾裤�� 娴佷笂绾跨洿鎺ュ彂閫乻endRtpItem娑堟伅缁欏疄闄呯殑淇′护澶勭悊鑰�
-                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
-                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
-                                null);
-                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
-                            // 璇诲彇redis涓殑涓婄骇鐐规挱淇℃伅锛岀敓鎴恠endRtpItm鍙戦�佸嚭鍘�
-
-                        });
-
+                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
+                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
 
                     }catch (Exception e) {
                         logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message));
@@ -78,4 +75,48 @@
             });
         }
     }
+
+    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
+        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
+        Map<String, Object> param = new HashMap<>(12);
+        param.put("vhost","__defaultVhost__");
+        param.put("app",sendRtpItem.getApp());
+        param.put("stream",sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        param.put("dst_url",sendRtpItem.getIp());
+        param.put("dst_port", sendRtpItem.getPort());
+        param.put("src_port", sendRtpItem.getLocalPort());
+        param.put("pt", sendRtpItem.getPt());
+        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+        param.put("is_udp", isUdp);
+        if (!sendRtpItem.isTcp()) {
+            // udp妯″紡涓嬪紑鍚痳tcp淇濇椿
+            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+        }
+        return param;
+    }
+
+    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
+        JSONObject startSendRtpStreamResult = null;
+        if (sendRtpItem.getLocalPort() != 0) {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }else {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }
+        return startSendRtpStreamResult;
+
+    }
 }

--
Gitblit v1.8.0