From ab81136765f1b641223b982b2baef13e06307fe4 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 08 十二月 2021 16:45:50 +0800
Subject: [PATCH] 优化适配zlm的hook保活

---
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java         |   98 +++++++++-
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java                        |   20 +
 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java     |   65 +++++++
 src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java                |    6 
 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java            |   11 +
 src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java              |   12 +
 src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java               |    8 
 src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java                           |    2 
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java              |   73 ++++---
 src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java        |   16 +
 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java                |    6 
 src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java             |    5 
 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java |   52 +++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java              |    1 
 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java                |   14 +
 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java             |   11 +
 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java        |   28 ++-
 src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java        |   42 +++
 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java    |   44 ++++
 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java                 |    6 
 20 files changed, 441 insertions(+), 79 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index f3d542c..2f62287 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -1,9 +1,7 @@
 package com.genersoft.iot.vmp.gb28181;
 
 import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
-import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import org.slf4j.Logger;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
index fd0cfdc..9495e9d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -5,6 +5,7 @@
 import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
 import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
 import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
+import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.stereotype.Component;
@@ -73,5 +74,10 @@
 		outEvent.setMediaServerId(mediaServerId);
 		applicationEventPublisher.publishEvent(outEvent);
 	}
-	
+
+	public void zlmOnlineEventPublish(String mediaServerId) {
+		ZLMOnlineEvent outEvent = new ZLMOnlineEvent(this);
+		outEvent.setMediaServerId(mediaServerId);
+		applicationEventPublisher.publishEvent(outEvent);
+	}
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 7065558..8c1239e 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -179,29 +179,33 @@
 	public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
 
 		logger.debug("[ ZLM HOOK ]on_publish API璋冪敤锛屽弬鏁帮細" + json.toString());
-
+		JSONObject ret = new JSONObject();
+		ret.put("code", 0);
+		ret.put("msg", "success");
+		ret.put("enableHls", true);
+		ret.put("enableMP4", userSetup.isRecordPushLive());
 		String mediaServerId = json.getString("mediaServerId");
 		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
 		if (subscribe != null) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
 				subscribe.response(mediaInfo, json);
+			}else {
+				ret.put("code", 1);
+				ret.put("msg", "zlm not register");
 			}
 		}
 	 	String app = json.getString("app");
 	 	String stream = json.getString("stream");
 		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream);
-		JSONObject ret = new JSONObject();
+
 		// 褰曞儚鍥炴斁鏃朵笉杩涜褰曞儚涓嬭浇
 		if (streamInfo != null) {
 			ret.put("enableMP4", false);
 		}else {
 			ret.put("enableMP4", userSetup.isRecordPushLive());
 		}
-		ret.put("code", 0);
-		ret.put("msg", "success");
-		ret.put("enableHls", true);
-		ret.put("enableMP4", userSetup.isRecordPushLive());
+
 		return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
 	}
 	
@@ -340,37 +344,38 @@
 				if (!"rtp".equals(app)){
 					String type = OriginType.values()[item.getOriginType()].getType();
 					MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-					if (regist) {
-						StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
-						redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
-						if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
-								|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
-								|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
-							zlmMediaListManager.addPush(item);
-						}
-					}else {
-						// 鍏煎娴佹敞閿�鏃剁被鍨嬮敊璇殑闂锛岀瓑zlm鏇存柊鍚庡垹闄�
-						StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
-						if (streamPushItem != null) {
-							type = "PUSH";
-						}else {
-							StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
-							if (streamProxyByAppAndStream != null) {
-								type = "PULL";
+					if (mediaServerItem != null){
+						if (regist) {
+							StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+							redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
+							if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+									|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+									|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+								zlmMediaListManager.addPush(item);
 							}
+						}else {
+							// 鍏煎娴佹敞閿�鏃剁被鍨嬮敊璇殑闂锛岀瓑zlm鏇存柊鍚庡垹闄�
+							StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
+							if (streamPushItem != null) {
+								type = "PUSH";
+							}else {
+								StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
+								if (streamProxyByAppAndStream != null) {
+									type = "PULL";
+								}
+							}
+							zlmMediaListManager.removeMedia(app, streamId);
+							redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
 						}
-						zlmMediaListManager.removeMedia(app, streamId);
-						redisCatchStorage.removeStream(mediaServerItem, type, app, streamId);
+						// 鍙戦�佹祦鍙樺寲redis娑堟伅
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("serverId", userSetup.getServerId());
+						jsonObject.put("app", app);
+						jsonObject.put("stream", streamId);
+						jsonObject.put("register", regist);
+						jsonObject.put("mediaServerId", mediaServerId);
+						redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
 					}
-
-					// 鍙戦�佹祦鍙樺寲redis娑堟伅
-					JSONObject jsonObject = new JSONObject();
-					jsonObject.put("serverId", userSetup.getServerId());
-					jsonObject.put("app", app);
-					jsonObject.put("stream", streamId);
-					jsonObject.put("register", regist);
-					jsonObject.put("mediaServerId", mediaServerId);
-					redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
 				}
 			}
 		}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
index f185d82..5b7ba1c 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -141,7 +141,6 @@
             }else {
                 gbStreamMapper.add(transform);
             }
-
         }
     }
 
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
index 5555617..4315c8d 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -4,6 +4,7 @@
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamProxyService;
@@ -17,6 +18,7 @@
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
 
 import java.util.*;
 
@@ -36,6 +38,9 @@
 
     @Autowired
     private IStreamProxyService streamProxyService;
+
+    @Autowired
+    private EventPublisher publisher;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -117,7 +122,7 @@
 
     @Async
     public void connectZlmServer(MediaServerItem mediaServerItem){
-        ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem);
+        ZLMServerConfig zlmServerConfig = getMediaServerConfig(mediaServerItem, 1);
         if (zlmServerConfig != null) {
             zlmServerConfig.setIp(mediaServerItem.getIp());
             zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
@@ -126,7 +131,7 @@
         }
     }
 
-    public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem) {
+    public ZLMServerConfig getMediaServerConfig(MediaServerItem mediaServerItem, int index) {
         if (startGetMedia == null) { return null;}
         if (!mediaServerItem.isDefaultServer() && mediaServerService.getOne(mediaServerItem.getId()) == null) {
             return null;
@@ -143,14 +148,19 @@
                 ZLMServerConfig.setIp(mediaServerItem.getIp());
             }
         } else {
-            logger.error("[ {} ]-[ {}:{} ]涓诲姩杩炴帴澶辫触澶辫触, 2s鍚庨噸璇�",
-                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
+            logger.error("[ {} ]-[ {}:{} ]绗瑊}娆′富鍔ㄨ繛鎺ュけ璐�, 2s鍚庨噸璇�",
+                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+            if (index == 1 && !StringUtils.isEmpty(mediaServerItem.getId())) {
+                logger.info("[ {} ]-[ {}:{} ]绗瑊}娆′富鍔ㄨ繛鎺ュけ璐�, 寮�濮嬫竻鐞嗙浉鍏宠祫婧�",
+                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort(), index);
+                publisher.zlmOfflineEventPublish(mediaServerItem.getId());
+            }
             try {
                 Thread.sleep(2000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            ZLMServerConfig = getMediaServerConfig(mediaServerItem);
+            ZLMServerConfig = getMediaServerConfig(mediaServerItem, index += 1);
         }
         return ZLMServerConfig;
 
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
new file mode 100644
index 0000000..33a251c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
@@ -0,0 +1,52 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**    
+ * @description:璁惧蹇冭烦瓒呮椂鐩戝惉,鍊熷姪redis杩囨湡鐗规�э紝杩涜鐩戝惉锛岀洃鍚埌璇存槑璁惧蹇冭烦瓒呮椂锛屽彂閫佺绾夸簨浠�
+ * @author: swwheihei
+ * @date:   2020骞�5鏈�6鏃� 涓婂崍11:35:46     
+ */
+@Component
+public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
+
+	@Autowired
+	private EventPublisher publisher;
+
+	@Autowired
+	private UserSetup userSetup;
+
+	public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
+		super(listenerContainer);
+	}
+
+	/**
+     * 鐩戝惉澶辨晥鐨刱ey锛宬ey鏍煎紡涓簁eeplive_deviceId
+     * @param message
+     * @param pattern
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        //  鑾峰彇澶辨晥鐨刱ey
+        String expiredKey = message.toString();
+        String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_";
+        if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
+        	return;
+        }
+        
+        String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
+
+        publisher.zlmOfflineEventPublish(mediaServerId);
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java
new file mode 100644
index 0000000..8207bdd
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEvent.java
@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm绂荤嚎浜嬩欢绫�
+ */
+public class ZLMOfflineEvent extends ZLMEventAbstract {
+
+	public ZLMOfflineEvent(Object source) {
+		super(source);
+	}
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
new file mode 100644
index 0000000..b713552
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ */
+@Component
+public class ZLMOfflineEventListener implements ApplicationListener<ZLMOfflineEvent> {
+
+	private final static Logger logger = LoggerFactory.getLogger(ZLMOfflineEventListener.class);
+
+	@Autowired
+    private IMediaServerService mediaServerService;
+
+	@Autowired
+    private IStreamPushService streamPushService;
+
+	@Autowired
+    private IStreamProxyService streamProxyService;
+
+	@Override
+	public void onApplicationEvent(ZLMOfflineEvent event) {
+		
+		if (logger.isDebugEnabled()) {
+			logger.debug("ZLM绂荤嚎浜嬩欢瑙﹀彂锛孖D锛�" + event.getMediaServerId());
+		}
+		// 澶勭悊ZLM绂荤嚎
+		mediaServerService.zlmServerOffline(event.getMediaServerId());
+		streamProxyService.zlmServerOffline(event.getMediaServerId());
+		streamPushService.zlmServerOffline(event.getMediaServerId());
+		// TODO 澶勭悊瀵瑰浗鏍囩殑褰卞搷
+	}
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java
new file mode 100644
index 0000000..612ff9d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEvent.java
@@ -0,0 +1,11 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+/**
+ * zlm鍦ㄧ嚎浜嬩欢
+ */
+public class ZLMOnlineEvent extends ZLMEventAbstract {
+
+	public ZLMOnlineEvent(Object source) {
+		super(source);
+	}
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
new file mode 100644
index 0000000..5731ea0
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
@@ -0,0 +1,65 @@
+package com.genersoft.iot.vmp.media.zlm.event;
+
+import com.genersoft.iot.vmp.conf.SipConfig;
+import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+
+/**
+ * @description: 鍦ㄧ嚎浜嬩欢鐩戝惉鍣紝鐩戝惉鍒扮绾垮悗锛屼慨鏀硅澶囩鍦ㄧ嚎鐘舵�併�� 璁惧鍦ㄧ嚎鏈変袱涓潵婧愶細
+ *               1銆佽澶囦富鍔ㄦ敞閿�锛屽彂閫佹敞閿�鎸囦护
+ *               2銆佽澶囨湭鐭ュ師鍥犵绾匡紝蹇冭烦瓒呮椂
+ * @author: swwheihei
+ * @date: 2020骞�5鏈�6鏃� 涓嬪崍1:51:23
+ */
+@Component
+public class ZLMOnlineEventListener implements ApplicationListener<ZLMOnlineEvent> {
+	
+	private final static Logger logger = LoggerFactory.getLogger(ZLMOnlineEventListener.class);
+
+	@Autowired
+	private IVideoManagerStorager storager;
+	
+	@Autowired
+    private RedisUtil redis;
+
+	@Autowired
+    private SipConfig sipConfig;
+
+	@Autowired
+    private UserSetup userSetup;
+
+	@Autowired
+	private IMediaServerService mediaServerService;
+
+	@Autowired
+	private IStreamPushService streamPushService;
+
+	@Autowired
+	private IStreamProxyService streamProxyService;
+
+	private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+	@Override
+	public void onApplicationEvent(ZLMOnlineEvent event) {
+		
+		if (logger.isDebugEnabled()) {
+			logger.debug("ZLM涓婄嚎浜嬩欢瑙﹀彂锛孖D锛�" + event.getMediaServerId());
+		}
+		streamPushService.zlmServerOnline(event.getMediaServerId());
+		streamProxyService.zlmServerOnline(event.getMediaServerId());
+
+
+
+	}
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
index 618b824..40b2c9a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -78,10 +78,10 @@
 
     /**
      * 鏂扮殑鑺傜偣鍔犲叆
-     * @param zlmServerConfig
+     * @param mediaServerId
      * @return
      */
-    void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+    void zlmServerOnline(String mediaServerId);
 
     /**
      * 鑺傜偣绂荤嚎
@@ -89,4 +89,6 @@
      * @return
      */
     void zlmServerOffline(String mediaServerId);
+
+    void clean();
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
index d8a4465..d228a1a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -34,6 +34,7 @@
      * @return
      */
     PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
+    List<StreamPushItem> getPushList(String mediaSererId);
 
     StreamPushItem transform(MediaItem item);
 
@@ -49,10 +50,10 @@
 
     /**
      * 鏂扮殑鑺傜偣鍔犲叆
-     * @param zlmServerConfig
+     * @param mediaServerId
      * @return
      */
-    void zlmServerOnline(ZLMServerConfig zlmServerConfig);
+    void zlmServerOnline(String mediaServerId);
 
     /**
      * 鑺傜偣绂荤嚎
@@ -61,4 +62,5 @@
      */
     void zlmServerOffline(String mediaServerId);
 
+    void clean();
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index e02bd3f..7d9f748 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -4,10 +4,10 @@
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -69,6 +69,9 @@
 
     @Autowired
     private RedisUtil redisUtil;
+
+    @Autowired
+    private EventPublisher publisher;
 
     @Autowired
     JedisUtil jedisUtil;
@@ -312,8 +315,6 @@
         return mediaServerMapper.update(mediaSerItem);
     }
 
-
-
     /**
      * 澶勭悊zlm涓婄嚎
      * @param zlmServerConfig zlm涓婄嚎鎼哄甫鐨勫弬鏁�
@@ -353,27 +354,30 @@
         if (serverItem.getRtpProxyPort() == 0) {
             serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
         }
-        if (StringUtils.isEmpty(serverItem.getId())) {
-            serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
-        }
         serverItem.setStatus(true);
+
         if (StringUtils.isEmpty(serverItem.getId())) {
             serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
             mediaServerMapper.updateByHostAndPort(serverItem);
         }else {
             mediaServerMapper.update(serverItem);
         }
-        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId();
+        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
         if (redisUtil.get(key) == null) {
-            SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain());
+            SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
             serverItem.setSsrcConfig(ssrcConfig);
-            redisUtil.set(key, serverItem);
+        }else {
+            MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
+            serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
         }
-
+        redisUtil.set(key, serverItem);
         resetOnlineServerItem(serverItem);
         updateMediaServerKeepalive(serverItem.getId(), null);
         setZLMConfig(serverItem);
+        publisher.zlmOnlineEventPublish(serverItem.getId());
+
     }
+
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
@@ -567,6 +571,10 @@
     @Override
     public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
         MediaServerItem mediaServerItem = getOne(mediaServerId);
+        if (mediaServerItem == null) {
+            logger.warn("[鏇存柊ZLM 淇濇椿淇℃伅]澶辫触锛屾湭鎵惧埌娴佸獟浣撲俊鎭�");
+            return;
+        }
         String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
         int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
         redisUtil.set(key, data, hookAliveInterval);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index 907893d..19bf13a 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -2,6 +2,7 @@
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -28,8 +29,7 @@
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 /**
  * 瑙嗛浠g悊涓氬姟
@@ -53,6 +53,9 @@
 
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private UserSetup userSetup;
 
     @Autowired
     private GbStreamMapper gbStreamMapper;
@@ -160,6 +163,9 @@
         }else {
             mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
         }
+        if (mediaServerItem == null) {
+            return null;
+        }
         if ("default".equals(param.getType())){
             result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
                     param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type());
@@ -244,7 +250,6 @@
                 }
             }
         }
-
         return result;
     }
 
@@ -255,18 +260,41 @@
     }
 
     @Override
-    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-
+    public void zlmServerOnline(String mediaServerId) {
+        zlmServerOffline(mediaServerId);
     }
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
         // 绉婚櫎寮�鍚簡鏃犱汉瑙傜湅鑷姩绉婚櫎鐨勬祦
+        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
+        if (streamProxyItemList.size() > 0) {
+            gbStreamMapper.batchDel(streamProxyItemList);
+        }
         streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
         // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢�
         streamProxyMapper.updateStatus(false, mediaServerId);
-        // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
-        redisCatchStorage.removeStream(mediaServerId, "PULL");
+        String type = "PULL";
+
+        // 鍙戦�乺edis娑堟伅
+        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+        if (streamInfoList.size() > 0) {
+            for (StreamInfo streamInfo : streamInfoList) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("serverId", userSetup.getServerId());
+                jsonObject.put("app", streamInfo.getApp());
+                jsonObject.put("stream", streamInfo.getStreamId());
+                jsonObject.put("register", false);
+                jsonObject.put("mediaServerId", mediaServerId);
+                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+            }
+        }
+    }
+
+    @Override
+    public void clean() {
 
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 7c17c2a..dcca0e5 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -3,11 +3,15 @@
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.OriginType;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -20,10 +24,7 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @Service
 public class StreamPushServiceImpl implements IStreamPushService {
@@ -44,6 +45,9 @@
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
+    private UserSetup userSetup;
+
+    @Autowired
     private IMediaServerService mediaServerService;
 
     @Override
@@ -56,7 +60,9 @@
         for (MediaItem item : mediaItems) {
 
             // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴�
-            if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
+            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                 String key = item.getApp() + "_" + item.getStream();
                 StreamPushItem streamPushItem = result.get(key);
                 if (streamPushItem == null) {
@@ -95,6 +101,11 @@
         PageHelper.startPage(page, count);
         List<StreamPushItem> all = streamPushMapper.selectAll();
         return new PageInfo<>(all);
+    }
+
+    @Override
+    public List<StreamPushItem> getPushList(String mediaServerId) {
+        return streamPushMapper.selectAllByMediaServerId(mediaServerId);
     }
 
     @Override
@@ -137,17 +148,84 @@
     }
 
     @Override
-    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-        // 浼间箮娌″暐闇�瑕佸仛鐨�
+    public void zlmServerOnline(String mediaServerId) {
+        // 鍚屾zlm鎺ㄦ祦淇℃伅
+        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+        if (mediaServerItem == null) {
+            return;
+        }
+        List<StreamPushItem> pushList = getPushList(mediaServerId);
+        if (pushList.size() > 0) {
+            Map<String, StreamPushItem> pushItemMap = new HashMap<>();
+            for (StreamPushItem streamPushItem : pushList) {
+                pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+            }
+            zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
+                if (mediaList == null) return;
+                String dataStr = mediaList.getString("data");
+
+                Integer code = mediaList.getInteger("code");
+                List<StreamPushItem> streamPushItems = null;
+                if (code == 0 ) {
+                    if (dataStr != null) {
+                        streamPushItems = handleJSON(dataStr, mediaServerItem);
+                    }
+                }
+
+                if (streamPushItems != null) {
+                    for (StreamPushItem streamPushItem : streamPushItems) {
+                        pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+                    }
+                }
+                Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
+                if (offlinePushItems.size() > 0) {
+                    String type = "PUSH";
+                    streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
+                    for (StreamPushItem offlinePushItem : offlinePushItems) {
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put("serverId", userSetup.getServerId());
+                        jsonObject.put("app", offlinePushItem.getApp());
+                        jsonObject.put("stream", offlinePushItem.getStream());
+                        jsonObject.put("register", false);
+                        jsonObject.put("mediaServerId", mediaServerId);
+                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                        // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+                        redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
+                    }
+                }
+            }));
+        }
     }
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
-        // 绉婚櫎娌℃湁serverId鐨勬帹娴�
+        List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId);
+        // 绉婚櫎娌℃湁GBId鐨勬帹娴�
         streamPushMapper.deleteWithoutGBId(mediaServerId);
+        gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
         // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢�
         gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
-        // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
-        redisCatchStorage.removeStream(mediaServerId, "PUSH");
+        // 鍙戦�佹祦鍋滄娑堟伅
+        String type = "PUSH";
+        // 鍙戦�乺edis娑堟伅
+        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+        if (streamInfoList.size() > 0) {
+            for (StreamInfo streamInfo : streamInfoList) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("serverId", userSetup.getServerId());
+                jsonObject.put("app", streamInfo.getApp());
+                jsonObject.put("stream", streamInfo.getStreamId());
+                jsonObject.put("register", false);
+                jsonObject.put("mediaServerId", mediaServerId);
+                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+                // 绉婚櫎redis鍐呮祦鐨勪俊鎭�
+                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
+            }
+        }
+    }
+
+    @Override
+    public void clean() {
+
     }
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
index 0803dd6..4f240d8 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -140,11 +140,11 @@
 
     /**
      * 绉婚櫎娴佷俊鎭粠redis
-     * @param mediaServerItem
+     * @param mediaServerId
      * @param app
      * @param streamId
      */
-    void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId);
+    void removeStream(String mediaServerId, String type, String app, String streamId);
 
 
     /**
@@ -167,4 +167,6 @@
      * @return
      */
     ThirdPartyGB queryMemberNoGBId(String queryKey);
+
+    List<StreamInfo> getStreams(String mediaServerId, String pull);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
index f159925..9757b13 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -65,4 +65,18 @@
             "SET status=${status} " +
             "WHERE mediaServerId=#{mediaServerId} ")
     void updateStatusByMediaServerId(String mediaServerId, boolean status);
+
+    @Select("SELECT * FROM gb_stream WHERE mediaServerId=#{mediaServerId}")
+    void delByMediaServerId(String mediaServerId);
+
+    @Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}")
+    void deleteWithoutGBId(String type, String mediaServerId);
+
+    @Delete("<script> "+
+            "DELETE FROM gb_stream where " +
+            "<foreach collection='streamProxyItemList' item='item' separator='or'>" +
+            "(app=#{item.app} and stream=#{item.stream}) " +
+            "</foreach>" +
+            "</script>")
+    void batchDel(List<StreamProxyItem> streamProxyItemList);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
index 82520ec..b6e1ba1 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -62,6 +62,9 @@
             "WHERE mediaServerId=#{mediaServerId}")
     void updateStatus(boolean status, String mediaServerId);
 
-    @Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}")
+    @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}")
     void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
+
+    @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.mediaServerId=#{mediaServerId} order by st.createTime desc")
+    List<StreamProxyItem> selecAutoRemoveItemByMediaServerId(String mediaServerId);
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
index 9fe6ebf..c5b22f8 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -4,6 +4,7 @@
 import org.apache.ibatis.annotations.*;
 import org.springframework.stereotype.Repository;
 
+import java.util.Collection;
 import java.util.List;
 
 @Mapper
@@ -31,6 +32,14 @@
     @Delete("DELETE FROM stream_push WHERE app=#{app} AND stream=#{stream}")
     int del(String app, String stream);
 
+    @Delete("<script> "+
+            "DELETE FROM stream_push where " +
+            "<foreach collection='streamPushItems' item='item' separator='or'>" +
+            "(app=#{item.app} and stream=#{item.stream}) " +
+            "</foreach>" +
+            "</script>")
+    int delAll(List<StreamPushItem> streamPushItems);
+
     @Select("SELECT st.*, pgs.gbId, pgs.status, pgs.name, pgs.longitude, pgs.latitude FROM stream_push st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream")
     List<StreamPushItem> selectAll();
 
@@ -56,4 +65,7 @@
     @Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}")
     void deleteWithoutGBId(String mediaServerId);
 
+    @Select("SELECT * FROM stream_push WHERE mediaServerId=#{mediaServerId}")
+    List<StreamPushItem> selectAllByMediaServerId(String mediaServerId);
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
index 6adc05d..af9a206 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -338,8 +338,8 @@
     }
 
     @Override
-    public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) {
-        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_"  + app + "_" + streamId + "_" + mediaServerItem.getId();
+    public void removeStream(String mediaServerId, String type, String app, String streamId) {
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_"  + app + "_" + streamId + "_" + mediaServerId;
         redis.del(key);
     }
 
@@ -365,4 +365,16 @@
             redis.del((String) stream);
         }
     }
+
+    @Override
+    public List<StreamInfo> getStreams(String mediaServerId, String type) {
+        List<StreamInfo> result = new ArrayList<>();
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
+        List<Object> streams = redis.scan(key);
+        for (Object stream : streams) {
+            StreamInfo streamInfo = (StreamInfo)redis.get((String) stream);
+            result.add(streamInfo);
+        }
+        return result;
+    }
 }

--
Gitblit v1.8.0