From fd091e545ba174adc36a9d3370e6d4c040ad33fd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 28 七月 2022 16:18:41 +0800
Subject: [PATCH] 优化hook订阅机制

---
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java              |   85 ++++++-----
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java                         |   33 ++-
 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java         |   14 -
 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java      |   52 ++----
 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java               |   12 
 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java                |   17 +-
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java                      |   23 +++
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java  |   43 ++++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java |   44 ++++++
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java          |   33 ++++
 src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java            |   16 ++
 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java                |   36 +++++
 12 files changed, 296 insertions(+), 112 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 0097ce0..feb66b4 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,6 +10,9 @@
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@
 	@Override
 	public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
 							  ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
-		String streamId = ssrcInfo.getStream();
+		String stream = ssrcInfo.getStream();
 		try {
 			if (device == null) {
 				return;
 			}
 			String streamMode = device.getStreamMode().toUpperCase();
 
-			logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
-			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", streamId);
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 				if (event != null) {
 					event.response(mediaServerItemInUse, json);
+					subscribe.removeSubscribe(hookSubscribe);
 				}
 			});
 			//
@@ -440,7 +437,7 @@
 				errorEvent.response(e);
 			}), e ->{
 				// 杩欓噷涓轰緥閬垮厤涓�涓�氶亾鐨勭偣鎾彧鏈変竴涓猚allID杩欎釜鍙傛暟浣跨敤涓�涓浐瀹氬��
-				streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
+				streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
 				streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
 				okEvent.response(e);
 			});
@@ -530,21 +527,14 @@
 
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
-
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
 			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey);
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						if (hookEvent != null) {
 							InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
 							hookEvent.call(inviteStreamInfo);
 						}
+						subscribe.removeSubscribe(hookSubscribe);
 					});
 	        Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
 
@@ -643,21 +633,15 @@
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
 
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
 			// 娣诲姞璁㈤槄
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
-						subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
-						subscribeKey.put("regist", false);
-						subscribeKey.put("schema", "rtmp");
+						subscribe.removeSubscribe(hookSubscribe);
+						hookSubscribe.getContent().put("regist", false);
+						hookSubscribe.getContent().put("schema", "rtmp");
 						// 娣诲姞娴佹敞閿�鐨勮闃咃紝娉ㄩ攢浜嗗悗鍚戣澶囧彂閫乥ye
-						subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+						subscribe.addSubscribe(hookSubscribe,
 								(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
 									ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
 									if (transaction != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index b1f0fec..18654dd 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -102,7 +102,7 @@
 			logger.debug("[ ZLM HOOK ] on_server_keepalive API璋冪敤锛屽弬鏁帮細" + json.toString());
 		}
 		String mediaServerId = json.getString("mediaServerId");
-		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
+		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
 		if (subscribes != null  && subscribes.size() > 0) {
 			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 				subscribe.response(null, json);
@@ -168,7 +168,7 @@
 			logger.debug("[ ZLM HOOK ]on_play API璋冪敤锛屽弬鏁帮細" + JSON.toJSONString(param));
 		}
 		String mediaServerId = param.getMediaServerId();
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -253,7 +253,7 @@
 		}
 
 
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
 		if (subscribe != null) {
 			if (mediaInfo != null) {
 				subscribe.response(mediaInfo, json);
@@ -377,7 +377,7 @@
 			logger.debug("[ ZLM HOOK ]on_shell_login API璋冪敤锛屽弬鏁帮細" + json.toString());
 		}
 		String mediaServerId = json.getString("mediaServerId");
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -403,7 +403,7 @@
 		logger.info("[ ZLM HOOK ]on_stream_changed API璋冪敤锛屽弬鏁帮細" + JSONObject.toJSONString(item));
 		String mediaServerId = item.getMediaServerId();
 		JSONObject json = (JSONObject) JSON.toJSON(item);
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -614,7 +614,7 @@
 		}
 		String remoteAddr = request.getRemoteAddr();
 		jsonObject.put("ip", remoteAddr);
-		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
+		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
 		if (subscribes != null  && subscribes.size() > 0) {
 			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 				subscribe.response(null, jsonObject);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
index ffd8ec9..a8286a8 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
@@ -1,12 +1,16 @@
 package com.genersoft.iot.vmp.media.zlm;
 
 import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @description:閽堝 ZLMediaServer鐨刪ook浜嬩欢璁㈤槄
@@ -16,49 +20,38 @@
 @Component
 public class ZLMHttpHookSubscribe {
 
-    public enum HookType{
-        on_flow_report,
-        on_http_access,
-        on_play,
-        on_publish,
-        on_record_mp4,
-        on_rtsp_auth,
-        on_rtsp_realm,
-        on_shell_login,
-        on_stream_changed,
-        on_stream_none_reader,
-        on_stream_not_found,
-        on_server_started,
-        on_server_keepalive
-    }
-
     @FunctionalInterface
     public interface Event{
         void response(MediaServerItem mediaServerItem, JSONObject response);
     }
 
-    private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
+    private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
 
-    public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
-        allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event);
+    public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
+        if (hookSubscribe.getExpires() == null) {
+            // 榛樿5鍒嗛挓杩囨湡
+            Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
+            hookSubscribe.setExpires(expiresInstant);
+        }
+        allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
     }
 
-    public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
+    public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
         ZLMHttpHookSubscribe.Event event= null;
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
         if (eventMap == null) {
             return null;
         }
-        for (JSONObject key : eventMap.keySet()) {
+        for (IHookSubscribe key : eventMap.keySet()) {
             Boolean result = null;
-            for (String s : key.keySet()) {
+            for (String s : key.getContent().keySet()) {
                 if (result == null) {
-                    result = key.getString(s).equals(hookResponse.getString(s));
+                    result = key.getContent().getString(s).equals(hookResponse.getString(s));
                 }else {
-                    if (key.getString(s) == null) {
+                    if (key.getContent().getString(s) == null) {
                         continue;
                     }
-                    result = result && key.getString(s).equals(hookResponse.getString(s));
+                    result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
                 }
 
             }
@@ -69,26 +62,30 @@
         return event;
     }
 
-    public void removeSubscribe(HookType type, JSONObject hookResponse) {
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+    public void removeSubscribe(IHookSubscribe hookSubscribe) {
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
         if (eventMap == null) {
             return;
         }
 
-        Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
+        Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
         if (entries.size() > 0) {
-            List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
-            for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {
-                JSONObject key = entry.getKey();
+            List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
+            for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
+                JSONObject content = entry.getKey().getContent();
+                if (content == null || content.size() == 0) {
+                    entriesToRemove.add(entry);
+                    continue;
+                }
                 Boolean result = null;
-                for (String s : key.keySet()) {
+                for (String s : content.keySet()) {
                     if (result == null) {
-                        result = key.getString(s).equals(hookResponse.getString(s));
+                        result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
                     }else {
-                        if (key.getString(s) == null) {
+                        if (content.getString(s) == null) {
                             continue;
                         }
-                        result = result && key.getString(s).equals(hookResponse.getString(s));
+                        result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
                     }
                 }
                 if (null != result && result){
@@ -97,7 +94,7 @@
             }
 
             if (!CollectionUtils.isEmpty(entriesToRemove)) {
-                for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
+                for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
                     entries.remove(entry);
                 }
             }
@@ -111,17 +108,25 @@
      * @return
      */
     public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
-        // ZLMHttpHookSubscribe.Event event= null;
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
         if (eventMap == null) {
             return null;
         }
         List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
-        for (JSONObject key : eventMap.keySet()) {
+        for (IHookSubscribe key : eventMap.keySet()) {
             result.add(eventMap.get(key));
         }
         return result;
     }
 
+    public List<IHookSubscribe> getAll(){
+        ArrayList<IHookSubscribe> result = new ArrayList<>();
+        Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
+        for (Map<IHookSubscribe, Event> value : values) {
+            result.addAll(value.keySet());
+        }
+        return result;
+    }
+
 
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
index 1bfb730..b24d0a1 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -6,22 +6,22 @@
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.core.annotation.Order;
 import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
 
+import java.time.Instant;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Order(value=1)
@@ -38,16 +38,10 @@
     private ZLMHttpHookSubscribe hookSubscribe;
 
     @Autowired
-    private IStreamProxyService streamProxyService;
-
-    @Autowired
     private EventPublisher publisher;
 
     @Autowired
     private IMediaServerService mediaServerService;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
     private MediaConfig mediaConfig;
@@ -67,16 +61,24 @@
             mediaServerService.updateToDatabase(mediaSerItem);
         }
         mediaServerService.syncCatchFromDatabase();
+        HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
+//        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
+//        hookSubscribeForStreamChange.setExpires(expiresInstant);
         // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁�
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
+        hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
                 (MediaServerItem mediaServerItem, JSONObject response)->{
             ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
             if (zlmServerConfig !=null ) {
                 if (startGetMedia != null) {
                     startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
+                    if (startGetMedia.size() == 0) {
+                        hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+                    }
                 }
             }
         });
+
+
 
         // 鑾峰彇zlm淇℃伅
         logger.info("[zlm] 绛夊緟榛樿zlm涓�...");
@@ -103,7 +105,6 @@
                 }
                 startGetMedia = null;
             }
-            hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
         //  TODO 娓呯悊鏁版嵁搴撲腑涓巖edis涓嶅尮閰嶇殑zlm
         }, 60 * 1000 );
     }
@@ -116,6 +117,9 @@
             zlmServerConfigFirst.setIp(mediaServerItem.getIp());
             zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
             startGetMedia.remove(mediaServerItem.getId());
+            if (startGetMedia.size() == 0) {
+                hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+            }
             mediaServerService.zlmServerOnline(zlmServerConfigFirst);
         }else {
             logger.info("[ {} ]-[ {}:{} ]涓诲姩杩炴帴澶辫触, 娓呯悊鐩稿叧璧勬簮锛� 寮�濮嬪皾璇曢噸璇曡繛鎺�",
@@ -130,6 +134,9 @@
                 zlmServerConfig.setIp(mediaServerItem.getIp());
                 zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
                 startGetMedia.remove(mediaServerItem.getId());
+                if (startGetMedia.size() == 0) {
+                    hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+                }
                 mediaServerService.zlmServerOnline(zlmServerConfig);
             }
         }, 2000);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
new file mode 100644
index 0000000..92172f3
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
@@ -0,0 +1,33 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * hook 璁㈤槄宸ュ巶
+ * @author lin
+ */
+public class HookSubscribeFactory {
+
+    public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
+        HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
+        JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
+        subscribeKey.put("app", app);
+        subscribeKey.put("stream", stream);
+        subscribeKey.put("regist", regist);
+        if (scheam != null) {
+            subscribeKey.put("schema", scheam);
+        }
+        subscribeKey.put("mediaServerId", mediaServerId);
+        hookSubscribe.setContent(subscribeKey);
+
+        return hookSubscribe;
+    }
+
+    public static HookSubscribeForServerStarted on_server_started() {
+        HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
+        hookSubscribe.setContent(new JSONObject());
+
+        return hookSubscribe;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
new file mode 100644
index 0000000..0b781e6
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook璁㈤槄-娴佸彉鍖�
+ * @author lin
+ */
+public class HookSubscribeForServerStarted implements IHookSubscribe{
+
+    private HookType hookType = HookType.on_server_started;
+
+    private JSONObject content;
+
+    @JSONField(format="yyyy-MM-dd HH:mm:ss")
+    private Instant expires;
+
+    @Override
+    public HookType getHookType() {
+        return hookType;
+    }
+
+    @Override
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    @Override
+    public Instant getExpires() {
+        return expires;
+    }
+
+    @Override
+    public void setExpires(Instant expires) {
+        this.expires = expires;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
new file mode 100644
index 0000000..d5b2fb8
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
@@ -0,0 +1,43 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook璁㈤槄-娴佸彉鍖�
+ * @author lin
+ */
+public class HookSubscribeForStreamChange implements IHookSubscribe{
+
+    private HookType hookType = HookType.on_stream_changed;
+
+    private JSONObject content;
+
+    private Instant expires;
+
+    @Override
+    public HookType getHookType() {
+        return hookType;
+    }
+
+    @Override
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    @Override
+    public Instant getExpires() {
+        return expires;
+    }
+
+    @Override
+    public void setExpires(Instant expires) {
+        this.expires = expires;
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
new file mode 100644
index 0000000..797ab81
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
@@ -0,0 +1,23 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+/**
+ * hook绫诲瀷
+ * @author lin
+ */
+
+public enum HookType {
+
+    on_flow_report,
+    on_http_access,
+    on_play,
+    on_publish,
+    on_record_mp4,
+    on_rtsp_auth,
+    on_rtsp_realm,
+    on_shell_login,
+    on_stream_changed,
+    on_stream_none_reader,
+    on_stream_not_found,
+    on_server_started,
+    on_server_keepalive
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
new file mode 100644
index 0000000..5f2ca33
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
@@ -0,0 +1,36 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.time.Instant;
+
+/**
+ * zlm hook浜嬩欢鐨勫弬鏁�
+ * @author lin
+ */
+public interface IHookSubscribe {
+
+    /**
+     * 鑾峰彇hook绫诲瀷
+     * @return hook绫诲瀷
+     */
+    HookType getHookType();
+
+    /**
+     * 鑾峰彇hook鐨勫叿浣撳唴瀹�
+     * @return hook鐨勫叿浣撳唴瀹�
+     */
+    JSONObject getContent();
+
+    /**
+     * 璁剧疆杩囨湡鏃堕棿
+     * @param instant 杩囨湡鏃堕棿
+     */
+    void setExpires(Instant instant);
+
+    /**
+     * 鑾峰彇杩囨湡鏃堕棿
+     * @return 杩囨湡鏃堕棿
+     */
+    Instant getExpires();
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index ddfbc79..93f5dd0 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -13,6 +13,9 @@
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -296,16 +299,10 @@
                     // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
                     if (!mediaServerItem.isRtpEnable()) {
                         // 娣诲姞璁㈤槄
-                        JSONObject subscribeKey = new JSONObject();
-                        subscribeKey.put("app", "rtp");
-                        subscribeKey.put("stream", stream);
-                        subscribeKey.put("regist", true);
-                        subscribeKey.put("schema", "rtmp");
-                        subscribeKey.put("mediaServerId", mediaServerItem.getId());
-                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
-                        subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
-                        subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-                                (MediaServerItem mediaServerItemInUse, JSONObject response)->{
+                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+                        subscribe.removeSubscribe(hookSubscribe);
+                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                                     logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
                                     dynamicTask.stop(timeOutTaskKey);
                                     // hook鍝嶅簲
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
index 638ea41..a4fa635 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
@@ -8,6 +8,9 @@
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
@@ -270,14 +273,9 @@
             }, userSetting.getPlatformPlayTimeout());
 
             // 娣诲姞璁㈤槄
-            JSONObject subscribeKey = new JSONObject();
-            subscribeKey.put("app", content.getApp());
-            subscribeKey.put("stream", content.getStream());
-            subscribeKey.put("regist", true);
-            subscribeKey.put("schema", "rtmp");
-            subscribeKey.put("mediaServerId", mediaServerItem.getId());
-            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
+
+            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                         dynamicTask.stop(taskKey);
                         responseSendItem(mediaServerItem, content, toId, serial);
                     });
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
index faed2c8..2311d4b 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
@@ -8,6 +8,8 @@
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.VersionInfo;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -38,7 +40,7 @@
 public class ServerController {
 
     @Autowired
-    private ConfigurableApplicationContext context;
+    private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -254,6 +256,18 @@
         return result;
     }
 
+    @ApiOperation("鑾峰彇褰撳墠鎵�鏈塰ook")
+    @GetMapping(value = "/hooks")
+    @ResponseBody
+    public WVPResult<List<IHookSubscribe>> getHooks(){
+        WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
+        result.setCode(0);
+        result.setMsg("success");
+        List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
+        result.setData(all);
+        return result;
+    }
+
 //    @ApiOperation("褰撳墠杩涜涓殑鍔ㄦ�佷换鍔�")
 //    @GetMapping(value = "/dynamicTask")
 //    @ResponseBody

--
Gitblit v1.8.0