From 42a2772d1aa7493bcc4fac3e24ee8eda4eebc23d Mon Sep 17 00:00:00 2001
From: xubinbin <1323875150@qq.com>
Date: 星期二, 12 十二月 2023 17:09:04 +0800
Subject: [PATCH] bugfix:请求头带token, SecurityUtils 获取用户id 一直为0 #1195

---
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java |  157 +++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 130 insertions(+), 27 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
old mode 100644
new mode 100755
index 29eb6e9..347d0e6
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -11,12 +11,10 @@
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
-import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
+import com.genersoft.iot.vmp.media.zlm.*;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
+import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -26,23 +24,30 @@
 import com.genersoft.iot.vmp.utils.JsonUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
+import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
 
 import java.io.File;
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * 濯掍綋鏈嶅姟鍣ㄨ妭鐐圭鐞�
@@ -70,6 +75,9 @@
     private UserSetting userSetting;
 
     @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
     private AssistRESTfulUtils assistRESTfulUtils;
 
     @Autowired
@@ -86,7 +94,7 @@
 
 
     @Autowired
-    private ZLMRTPServerFactory zlmrtpServerFactory;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
     private EventPublisher publisher;
@@ -98,7 +106,15 @@
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
+    private IInviteStreamService inviteStreamService;
+
+    @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
 
 
     /**
@@ -112,7 +128,7 @@
                 continue;
             }
             // 鏇存柊
-            if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
+            if (!ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
                 ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
             }
             // 鏌ヨredis鏄惁瀛樺湪姝ediaServer
@@ -121,17 +137,13 @@
             if (hasKey != null && ! hasKey) {
                 redisTemplate.opsForValue().set(key, mediaServerItem);
             }
-
         }
     }
 
-    @Override
-    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback) {
-        return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,isPlayback);
-    }
 
     @Override
-    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) {
+    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
+                                  boolean isPlayback, Integer port, Boolean reUsePort, Integer tcpMode) {
         if (mediaServerItem == null || mediaServerItem.getId() == null) {
             logger.info("[openRTPServer] 澶辫触, mediaServerItem == null || mediaServerItem.getId() == null");
             return null;
@@ -149,11 +161,16 @@
         }
 
         if (streamId == null) {
-            streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
+            streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
+        }
+        int ssrcCheckParam = 0;
+        if (ssrcCheck && tcpMode > 1) {
+            // 鐩墠zlm涓嶆敮鎸� tcp妯″紡鏇存柊ssrc锛屾殏鏃跺叧闂璼src鏍¢獙
+            logger.warn("[openRTPServer] TCP琚姩/TCP涓诲姩鏀舵祦鏃讹紝榛樿鍏抽棴ssrc妫�楠�");
         }
         int rtpServerPort;
         if (mediaServerItem.isRtpEnable()) {
-            rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port);
+            rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, (ssrcCheck && tcpMode == 0) ? Long.parseLong(ssrc) : 0, port, reUsePort, tcpMode);
         } else {
             rtpServerPort = mediaServerItem.getRtpProxyPort();
         }
@@ -161,16 +178,11 @@
     }
 
     @Override
-    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback) {
-        return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, null);
-    }
-
-    @Override
     public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
         if (mediaServerItem == null) {
             return;
         }
-        zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId);
+        zlmServerFactory.closeRtpServer(mediaServerItem, streamId);
     }
 
     @Override
@@ -179,13 +191,18 @@
             callback.run(false);
             return;
         }
-        zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
+        zlmServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
     }
 
     @Override
     public void closeRTPServer(String mediaServerId, String streamId) {
         MediaServerItem mediaServerItem = this.getOne(mediaServerId);
         closeRTPServer(mediaServerItem, streamId);
+    }
+
+    @Override
+    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
+        return zlmServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
     }
 
     @Override
@@ -212,7 +229,7 @@
         mediaServerMapper.update(mediaSerItem);
         MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId());
         MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
-        if (mediaServerItemInRedis == null || ssrcFactory.hasMediaServerSSRC(mediaSerItem.getId())) {
+        if (mediaServerItemInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaSerItem.getId())) {
             ssrcFactory.initMediaServerSSRC(mediaServerItemInDataBase.getId(),null);
         }
         String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
@@ -394,7 +411,7 @@
         }
         mediaServerMapper.update(serverItem);
         String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
-        if (ssrcFactory.hasMediaServerSSRC(serverItem.getId())) {
+        if (!ssrcFactory.hasMediaServerSSRC(serverItem.getId())) {
             ssrcFactory.initMediaServerSSRC(zlmServerConfig.getGeneralMediaServerId(), null);
         }
         redisTemplate.opsForValue().set(key, serverItem);
@@ -417,7 +434,7 @@
         }
         final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
         dynamicTask.stop(zlmKeepaliveKey);
-        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000);
+        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000);
         publisher.zlmOnlineEventPublish(serverItem.getId());
 
         logger.info("[ZLM] 杩炴帴鎴愬姛 {} - {}:{} ",
@@ -552,7 +569,7 @@
         Map<String, Object> param = new HashMap<>();
         param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
         if (mediaServerItem.getRtspPort() != 0) {
-            param.put("ffmpeg.snap", "%s -rtsp_transport tcp -i %s -y -f mjpeg -t 0.001 %s");
+            param.put("ffmpeg.snap", "%s -rtsp_transport tcp -i %s -y -f mjpeg -frames:v 1 %s");
         }
         param.put("hook.enable","1");
         param.put("hook.on_flow_report","");
@@ -688,7 +705,7 @@
             // 缂撳瓨涓嶅瓨鍦紝浠庢暟鎹簱鏌ヨ锛屽鏋滄暟鎹簱涓嶅瓨鍦ㄥ垯鏄敊璇殑
             mediaServerItem = getOneFromDatabase(mediaServerId);
             if (mediaServerItem == null) {
-                logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
+                logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅] 娴佸獟浣搟}灏氭湭鍔犲叆浣跨敤,璇锋鏌ヨ妭鐐逛腑鏄惁鍚湁姝ゆ祦濯掍綋 ", mediaServerId);
                 return;
             }
             // zlm杩炴帴閲嶈瘯
@@ -738,9 +755,95 @@
         result.setId(mediaServerItem.getId());
         result.setPush(redisCatchStorage.getPushStreamCount(mediaServerItem.getId()));
         result.setProxy(redisCatchStorage.getProxyStreamCount(mediaServerItem.getId()));
-        result.setGbReceive(redisCatchStorage.getGbReceiveCount(mediaServerItem.getId()));
+
+        result.setGbReceive(inviteStreamService.getStreamInfoCount(mediaServerItem.getId()));
         result.setGbSend(redisCatchStorage.getGbSendCount(mediaServerItem.getId()));
         return result;
     }
 
+    @Override
+    public List<RecordFile> getRecords(String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems) {
+        Assert.notNull(app, "app涓嶅瓨鍦�");
+        Assert.notNull(stream, "stream涓嶅瓨鍦�");
+        Assert.notNull(startTime, "startTime涓嶅瓨鍦�");
+        Assert.notNull(endTime, "endTime涓嶅瓨鍦�");
+        Assert.notEmpty(mediaServerItems, "娴佸獟浣撳垪琛ㄤ负绌�");
+
+        CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
+        for (int i = 0; i < mediaServerItems.size(); i++) {
+            completableFutures[i] = getRecordFilesForOne(app, stream, startTime, endTime, mediaServerItems.get(i));
+        }
+        List<RecordFile> result = new ArrayList<>();
+        for (int i = 0; i < completableFutures.length; i++) {
+            try {
+                List<RecordFile> list = (List<RecordFile>) completableFutures[i].get();
+                if (!list.isEmpty()) {
+                    for (int g = 0; g < list.size(); g++) {
+                        list.get(g).setMediaServerId(mediaServerItems.get(i).getId());
+                    }
+                    result.addAll(list);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        Comparator<RecordFile> comparator = Comparator.comparing(RecordFile::getFileName);
+        result.sort(comparator);
+        return result;
+    }
+
+    @Override
+    public List<String> getRecordDates(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems) {
+        Assert.notNull(app, "app涓嶅瓨鍦�");
+        Assert.notNull(stream, "stream涓嶅瓨鍦�");
+        Assert.notEmpty(mediaServerItems, "娴佸獟浣撳垪琛ㄤ负绌�");
+        CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
+
+        for (int i = 0; i < mediaServerItems.size(); i++) {
+            completableFutures[i] = getRecordDatesForOne(app, stream, year, month, mediaServerItems.get(i));
+        }
+        List<String> result = new ArrayList<>();
+        CompletableFuture.allOf(completableFutures).join();
+        for (CompletableFuture completableFuture : completableFutures) {
+            try {
+                List<String> list = (List<String>) completableFuture.get();
+                result.addAll(list);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        Collections.sort(result);
+        return result;
+    }
+
+    @Async
+    public CompletableFuture<List<String>> getRecordDatesForOne(String app, String stream, int year, int month, MediaServerItem mediaServerItem) {
+        JSONObject fileListJson = assistRESTfulUtils.getDateList(mediaServerItem, app, stream, year, month);
+        if (fileListJson != null && !fileListJson.isEmpty()) {
+            if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
+                JSONArray data = fileListJson.getJSONArray("data");
+                return CompletableFuture.completedFuture(data.toJavaList(String.class));
+            }
+        }
+        return CompletableFuture.completedFuture(new ArrayList<>());
+    }
+
+    @Async
+    public CompletableFuture<List<RecordFile>> getRecordFilesForOne(String app, String stream, String startTime, String endTime, MediaServerItem mediaServerItem) {
+        JSONObject fileListJson = assistRESTfulUtils.getFileList(mediaServerItem, 1, 100000000, app, stream, startTime, endTime);
+        if (fileListJson != null && !fileListJson.isEmpty()) {
+            if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
+                JSONObject data = fileListJson.getJSONObject("data");
+                JSONArray list = data.getJSONArray("list");
+                if (list != null) {
+                    return CompletableFuture.completedFuture(list.toJavaList(RecordFile.class));
+                }
+            }
+        }
+        return CompletableFuture.completedFuture(new ArrayList<>());
+    }
 }

--
Gitblit v1.8.0