From 29f7a6b6eba350f2c49b744f110d1aa033f77d02 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 10 一月 2024 18:55:14 +0800
Subject: [PATCH] 修复多wvp模式推流时信息存储错误

---
 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java |   58 ++++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
old mode 100644
new mode 100755
index 9f04950..3196613
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -6,13 +6,14 @@
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@
 
 import java.text.ParseException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -70,7 +72,7 @@
     private RedisTemplate<Object, Object> redisTemplate;
 
     @Autowired
-    private ZLMRTPServerFactory zlmrtpServerFactory;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -111,8 +113,8 @@
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
                     try {
-                        JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
+                        WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
+                        logger.info("[鏀跺埌REDIS閫氱煡] 娑堟伅锛� {}", JSON.toJSONString(wvpRedisMsg));
                         if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
                             continue;
                         }
@@ -121,12 +123,13 @@
 
                             switch (wvpRedisMsg.getCmd()){
                                 case WvpRedisMsgCmd.GET_SEND_ITEM:
-                                    RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
+                                    RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
                                     requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
                                     break;
                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
+
                                     break;
                                 default:
                                     break;
@@ -227,7 +230,7 @@
         param.put("pt", requestPushStreamMsg.getPt());
         param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
         param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
-        JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
+        JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
         // 鍥炲娑堟伅
         responsePushStream(jsonObject, fromId, serial);
     }
@@ -239,7 +242,7 @@
         result.setData(content);
 
         WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
-                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
+                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
         JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
     }
@@ -257,14 +260,14 @@
             result.setMsg("娴佸獟浣撲笉瀛樺湪");
 
             WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
-                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
+                    WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
 
             JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
             redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
             return;
         }
         // 纭畾娴佹槸鍚﹀湪绾�
-        Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
+        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
         if (streamReady != null && streamReady) {
             logger.info("[鍥炲鎺ㄦ祦淇℃伅]  {}/{}", content.getApp(), content.getStream());
             responseSendItem(mediaServerItem, content, toId, serial);
@@ -280,7 +283,7 @@
                 WVPResult<SendRtpItem> result = new WVPResult<>();
                 result.setCode(ERROR_CODE_TIMEOUT);
                 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
-                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
+                        userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
                 );
                 JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
                 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -308,7 +311,7 @@
      * 灏嗚幏鍙栧埌鐨剆endItem鍙戦�佸嚭鍘�
      */
     private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
-        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
+        SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                 content.getPort(), content.getSsrc(), content.getPlatformId(),
                 content.getApp(), content.getStream(), content.getChannelId(),
                 content.getTcp(), content.getRtcp());
@@ -321,7 +324,7 @@
         result.setData(responseSendItemMsg);
 
         WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
-                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
+                userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
         );
         JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -347,7 +350,7 @@
         requestSendItemMsg.setServerId(serverId);
         String key = UUID.randomUUID().toString();
         WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
-                key, requestSendItemMsg);
+                key, JSON.toJSONString(requestSendItemMsg));
 
         JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
         logger.info("[璇锋眰鎺ㄦ祦SendItem] {}: {}", serverId, jsonObject);
@@ -372,7 +375,7 @@
     public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
         String key = UUID.randomUUID().toString();
         WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
-                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
+                WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
 
         JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
         logger.info("[REDIS 璇锋眰鍏朵粬骞冲彴鎺ㄦ祦] {}: {}", serverId, jsonObject);
@@ -388,4 +391,31 @@
         });
         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
     }
+
+    private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
+        if (platformGbId == null) {
+            platformGbId = "*";
+        }
+        if (channelId == null) {
+            channelId = "*";
+        }
+        if (streamId == null) {
+            streamId = "*";
+        }
+        if (callId == null) {
+            callId = "*";
+        }
+        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+                + userSetting.getServerId() + "_*_"
+                + platformGbId + "_"
+                + channelId + "_"
+                + streamId + "_"
+                + callId;
+        List<Object> scan = RedisUtil.scan(redisTemplate, key);
+        if (scan.size() > 0) {
+            return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
+        }else {
+            return null;
+        }
+    }
 }

--
Gitblit v1.8.0