From f7e39141db04ae209f436f359e781cea6a978051 Mon Sep 17 00:00:00 2001
From: 648540858 <456panlinlin>
Date: 星期三, 06 四月 2022 11:07:42 +0800
Subject: [PATCH] 修复重启服务后拉流代理回复失败的问题

---
 src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java                      |    3 -
 src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java              |    4 +-
 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java             |    3 -
 src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java                     |    4 +-
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java                |   47 +++++++++++++++++++++++
 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java |    1 
 6 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 95578ef..f9546f0 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -175,3 +175,4 @@
         }
     }
 }
+ 
\ No newline at end of file
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index aca3082..b30a5b2 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.service.impl;
 
+import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.SipConfig;
@@ -285,9 +286,12 @@
         }
         streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
 
+        // 绉婚櫎鎷夋祦浠g悊鐢熸垚鐨勬祦淇℃伅
+//        syncPullStream(mediaServerId);
+
         // 鎭㈠娴佷唬鐞�, 鍙煡鎵捐繖涓繖涓祦濯掍綋
         List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
-                mediaServerId, true, false);
+                mediaServerId, true);
         for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
             logger.info("鎭㈠娴佷唬鐞嗭紝" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
             JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
@@ -339,4 +343,45 @@
     public int updateStatus(boolean status, String app, String stream) {
         return streamProxyMapper.updateStatus(status, app, stream);
     }
+
+    private void syncPullStream(String mediaServerId){
+        MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
+        if (mediaServer != null) {
+            List<MediaItem> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
+            if (allPullStream.size() > 0) {
+                zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
+                    Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
+                    if (jsonObject.getInteger("code") == 0) {
+                        JSONArray data = jsonObject.getJSONArray("data");
+                        if(data != null && data.size() > 0) {
+                            for (int i = 0; i < data.size(); i++) {
+                                JSONObject streamJSONObj = data.getJSONObject(i);
+                                if ("rtmp".equals(streamJSONObj.getString("schema"))) {
+                                    StreamInfo streamInfo = new StreamInfo();
+                                    String app = streamJSONObj.getString("app");
+                                    String stream = streamJSONObj.getString("stream");
+                                    streamInfo.setApp(app);
+                                    streamInfo.setStream(stream);
+                                    stringStreamInfoMap.put(app+stream, streamInfo);
+                                }
+                            }
+                        }
+                    }
+                    if (stringStreamInfoMap.size() == 0) {
+                        redisCatchStorage.removeStream(mediaServerId, "PULL");
+                    }else {
+                        for (String key : stringStreamInfoMap.keySet()) {
+                            StreamInfo streamInfo = stringStreamInfoMap.get(key);
+                            if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
+                                redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
+                                        streamInfo.getStream());
+                            }
+                        }
+                    }
+                });
+            }
+
+        }
+
+    }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
index 11d8b84..9d12eb5 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
@@ -401,10 +401,9 @@
 	 * 鏍规嵁濯掍綋ID鑾峰彇鍚敤/涓嶅惎鐢ㄧ殑浠g悊鍒楄〃
 	 * @param id 濯掍綋ID
 	 * @param enable 鍚敤/涓嶅惎鐢�
-	 * @param status 鐘舵��
 	 * @return
 	 */
-	List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id,  boolean enable, boolean status);
+	List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id,  boolean enable);
 
 	/**
 	 * 鏍规嵁閫氶亾ID鑾峰彇鍏舵墍鍦ㄨ澶�
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
index 2447ad9..29c0b01 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -51,8 +51,8 @@
 
     @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
             "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
-            "WHERE st.enable=${enable} and st.status=${status} and st.mediaServerId = '${id}' order by st.createTime desc")
-    List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable, boolean status);
+            "WHERE st.enable=${enable} and st.mediaServerId = #{id} order by st.createTime desc")
+    List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable);
 
     @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
             "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
index 1665573..439ecd8 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -911,8 +911,8 @@
 	}
 
 	@Override
-	public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status) {
-		return streamProxyMapper.selectForEnableInMediaServer(id, enable, status);
+	public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
+		return streamProxyMapper.selectForEnableInMediaServer(id, enable);
 	}
 
 
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
index a2619fa..d4995a0 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -65,7 +65,4 @@
         }
         return result;
     }
-
-
-
 }

--
Gitblit v1.8.0