From 150e7a31997f590eba879c3515f21821e9e68eb6 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 22 三月 2024 16:45:18 +0800
Subject: [PATCH] 调整节点管理代码结构

---
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java |  164 +++++++++++++++++++++++-------------------------------
 1 files changed, 71 insertions(+), 93 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index ea56055..c5b7f58 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -1,21 +1,22 @@
 package com.genersoft.iot.vmp.service.impl;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.TypeReference;
 import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
 import com.genersoft.iot.vmp.service.IGbStreamService;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -67,9 +68,6 @@
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMRESTfulUtils zlmresTfulUtils;
-
-    @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
@@ -88,32 +86,27 @@
     private MediaConfig mediaConfig;
 
 
-    @Override
-    public List<StreamPushItem> handleJSON(String jsonData, MediaServer mediaServerItem) {
-        if (jsonData == null) {
+    private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
+        if (streamInfoList == null || streamInfoList.isEmpty()) {
             return null;
         }
-
         Map<String, StreamPushItem> result = new HashMap<>();
-
-        List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {});
-        for (OnStreamChangedHookParam item : onStreamChangedHookParams) {
-
+        for (StreamInfo streamInfo : streamInfoList) {
             // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴�
-            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
-                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
-                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
-                String key = item.getApp() + "_" + item.getStream();
+            if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                    || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                    || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+                String key = streamInfo.getApp() + "_" + streamInfo.getStream();
                 StreamPushItem streamPushItem = result.get(key);
                 if (streamPushItem == null) {
-                    streamPushItem = transform(item);
+                    streamPushItem = streamPushItem.instance(streamInfo);
                     result.put(key, streamPushItem);
                 }
             }
         }
-
         return new ArrayList<>(result.values());
     }
+
     @Override
     public StreamPushItem transform(OnStreamChangedHookParam item) {
         StreamPushItem streamPushItem = new StreamPushItem();
@@ -165,14 +158,9 @@
         platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
         int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
         MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
-        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
-        if (mediaList != null) {
-            if (mediaList.getInteger("code") == 0) {
-                JSONArray data = mediaList.getJSONArray("data");
-                if (data == null) {
-                    streamPushMapper.del(stream.getApp(), stream.getStream());
-                }
-            }
+        List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
+        if (mediaList != null && mediaList.isEmpty()) {
+            streamPushMapper.del(stream.getApp(), stream.getStream());
         }
         return del > 0;
     }
@@ -196,7 +184,7 @@
         int delStream = streamPushMapper.del(app, streamId);
         if (delStream > 0) {
             MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
-            zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
+            mediaServerService.closeStreams(mediaServerItem,app, streamId);
         }
         return true;
     }
@@ -232,71 +220,61 @@
         for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
             streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
         }
-        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
-            if (mediaList == null) {
-                return;
+        List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
+        if (mediaList == null) {
+            return;
+        }
+        List<StreamPushItem> streamPushItems = handleJSON(mediaList);
+        if (streamPushItems != null) {
+            for (StreamPushItem streamPushItem : streamPushItems) {
+                pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+                streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+                streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
             }
-            String dataStr = mediaList.getString("data");
-
-            Integer code = mediaList.getInteger("code");
-            List<StreamPushItem> streamPushItems = null;
-            if (code == 0 ) {
-                if (dataStr != null) {
-                    streamPushItems = handleJSON(dataStr, mediaServerItem);
-                }
-            }
-
-            if (streamPushItems != null) {
-                for (StreamPushItem streamPushItem : streamPushItems) {
-                    pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
-                    streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
-                    streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
-                }
-            }
-            List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
-            if (offlinePushItems.size() > 0) {
-                String type = "PUSH";
-                int runLimit = 300;
-                if (offlinePushItems.size() > runLimit) {
-                    for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
-                        int toIndex = i + runLimit;
-                        if (i + runLimit > offlinePushItems.size()) {
-                            toIndex = offlinePushItems.size();
-                        }
-                        List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
-                        streamPushMapper.delAll(streamPushItemsSub);
+        }
+        List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
+        if (offlinePushItems.size() > 0) {
+            String type = "PUSH";
+            int runLimit = 300;
+            if (offlinePushItems.size() > runLimit) {
+                for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
+                    int toIndex = i + runLimit;
+                    if (i + runLimit > offlinePushItems.size()) {
+                        toIndex = offlinePushItems.size();
                     }
-                }else {
-                    streamPushMapper.delAll(offlinePushItems);
+                    List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
+                    streamPushMapper.delAll(streamPushItemsSub);
                 }
-
-            }
-            Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
-            if (offlineOnStreamChangedHookParamList.size() > 0) {
-                String type = "PUSH";
-                for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
-                    JSONObject jsonObject = new JSONObject();
-                    jsonObject.put("serverId", userSetting.getServerId());
-                    jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
-                    jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
-                    jsonObject.put("register", false);
-                    jsonObject.put("mediaServerId", mediaServerId);
-                    redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
-                    // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
-                    redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
-                    // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
-                    redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
-                }
+            }else {
+                streamPushMapper.delAll(offlinePushItems);
             }
 
-            Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
-            if (streamAuthorityInfos.size() > 0) {
-                for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
-                    // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
-                    redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
-                }
+        }
+        Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
+        if (offlineOnStreamChangedHookParamList.size() > 0) {
+            String type = "PUSH";
+            for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("serverId", userSetting.getServerId());
+                jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
+                jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
+                jsonObject.put("register", false);
+                jsonObject.put("mediaServerId", mediaServerId);
+                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+                redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
+                // 鍐椾綑鏁版嵁锛岃嚜宸辩郴缁熶腑鑷敤
+                redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
             }
-        }));
+        }
+
+        Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
+        if (streamAuthorityInfos.size() > 0) {
+            for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
+                // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+                redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
+            }
+        }
     }
 
     @Override
@@ -471,7 +449,7 @@
         if (delStream > 0) {
             for (GbStream gbStream : gbStreams) {
                 MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
-                zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+                mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
             }
 
         }

--
Gitblit v1.8.0