From fd091e545ba174adc36a9d3370e6d4c040ad33fd Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 28 七月 2022 16:18:41 +0800
Subject: [PATCH] 优化hook订阅机制
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java | 85 ++++++-----
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java | 33 ++-
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java | 14 -
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 52 ++----
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 12
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 17 +-
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java | 23 +++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java | 43 ++++++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java | 44 ++++++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java | 33 ++++
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java | 16 ++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java | 36 +++++
12 files changed, 296 insertions(+), 112 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 0097ce0..feb66b4 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,6 +10,9 @@
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@
@Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
- String streamId = ssrcInfo.getStream();
+ String stream = ssrcInfo.getStream();
try {
if (device == null) {
return;
}
String streamMode = device.getStreamMode().toUpperCase();
- logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
- // 娣诲姞璁㈤槄
- JSONObject subscribeKey = new JSONObject();
- subscribeKey.put("app", "rtp");
- subscribeKey.put("stream", streamId);
- subscribeKey.put("regist", true);
- subscribeKey.put("schema", "rtmp");
- subscribeKey.put("mediaServerId", mediaServerItem.getId());
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
- (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+ logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (event != null) {
event.response(mediaServerItemInUse, json);
+ subscribe.removeSubscribe(hookSubscribe);
}
});
//
@@ -440,7 +437,7 @@
errorEvent.response(e);
}), e ->{
// 杩欓噷涓轰緥閬垮厤涓�涓�氶亾鐨勭偣鎾彧鏈変竴涓猚allID杩欎釜鍙傛暟浣跨敤涓�涓浐瀹氬��
- streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
+ streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
okEvent.response(e);
});
@@ -530,21 +527,14 @@
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
-
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
// 娣诲姞璁㈤槄
- JSONObject subscribeKey = new JSONObject();
- subscribeKey.put("app", "rtp");
- subscribeKey.put("stream", ssrcInfo.getStream());
- subscribeKey.put("regist", true);
- subscribeKey.put("schema", "rtmp");
- subscribeKey.put("mediaServerId", mediaServerItem.getId());
- logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey);
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
- (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (hookEvent != null) {
InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
hookEvent.call(inviteStreamInfo);
}
+ subscribe.removeSubscribe(hookSubscribe);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
@@ -643,21 +633,15 @@
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
// 娣诲姞璁㈤槄
- JSONObject subscribeKey = new JSONObject();
- subscribeKey.put("app", "rtp");
- subscribeKey.put("stream", ssrcInfo.getStream());
- subscribeKey.put("regist", true);
- subscribeKey.put("mediaServerId", mediaServerItem.getId());
- logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString());
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
- (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
- subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
- subscribeKey.put("regist", false);
- subscribeKey.put("schema", "rtmp");
+ subscribe.removeSubscribe(hookSubscribe);
+ hookSubscribe.getContent().put("regist", false);
+ hookSubscribe.getContent().put("schema", "rtmp");
// 娣诲姞娴佹敞閿�鐨勮闃咃紝娉ㄩ攢浜嗗悗鍚戣澶囧彂閫乥ye
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+ subscribe.addSubscribe(hookSubscribe,
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
if (transaction != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index b1f0fec..18654dd 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -102,7 +102,7 @@
logger.debug("[ ZLM HOOK ] on_server_keepalive API璋冪敤锛屽弬鏁帮細" + json.toString());
}
String mediaServerId = json.getString("mediaServerId");
- List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
+ List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
@@ -168,7 +168,7 @@
logger.debug("[ ZLM HOOK ]on_play API璋冪敤锛屽弬鏁帮細" + JSON.toJSONString(param));
}
String mediaServerId = param.getMediaServerId();
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
+ ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -253,7 +253,7 @@
}
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
+ ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
@@ -377,7 +377,7 @@
logger.debug("[ ZLM HOOK ]on_shell_login API璋冪敤锛屽弬鏁帮細" + json.toString());
}
String mediaServerId = json.getString("mediaServerId");
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
+ ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -403,7 +403,7 @@
logger.info("[ ZLM HOOK ]on_stream_changed API璋冪敤锛屽弬鏁帮細" + JSONObject.toJSONString(item));
String mediaServerId = item.getMediaServerId();
JSONObject json = (JSONObject) JSON.toJSON(item);
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
+ ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -614,7 +614,7 @@
}
String remoteAddr = request.getRemoteAddr();
jsonObject.put("ip", remoteAddr);
- List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
+ List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
index ffd8ec9..a8286a8 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
@@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
/**
* @description:閽堝 ZLMediaServer鐨刪ook浜嬩欢璁㈤槄
@@ -16,49 +20,38 @@
@Component
public class ZLMHttpHookSubscribe {
- public enum HookType{
- on_flow_report,
- on_http_access,
- on_play,
- on_publish,
- on_record_mp4,
- on_rtsp_auth,
- on_rtsp_realm,
- on_shell_login,
- on_stream_changed,
- on_stream_none_reader,
- on_stream_not_found,
- on_server_started,
- on_server_keepalive
- }
-
@FunctionalInterface
public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response);
}
- private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
+ private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
- public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
- allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event);
+ public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
+ if (hookSubscribe.getExpires() == null) {
+ // 榛樿5鍒嗛挓杩囨湡
+ Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
+ hookSubscribe.setExpires(expiresInstant);
+ }
+ allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
}
- public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
+ public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
ZLMHttpHookSubscribe.Event event= null;
- Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+ Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
- for (JSONObject key : eventMap.keySet()) {
+ for (IHookSubscribe key : eventMap.keySet()) {
Boolean result = null;
- for (String s : key.keySet()) {
+ for (String s : key.getContent().keySet()) {
if (result == null) {
- result = key.getString(s).equals(hookResponse.getString(s));
+ result = key.getContent().getString(s).equals(hookResponse.getString(s));
}else {
- if (key.getString(s) == null) {
+ if (key.getContent().getString(s) == null) {
continue;
}
- result = result && key.getString(s).equals(hookResponse.getString(s));
+ result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
}
}
@@ -69,26 +62,30 @@
return event;
}
- public void removeSubscribe(HookType type, JSONObject hookResponse) {
- Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+ public void removeSubscribe(IHookSubscribe hookSubscribe) {
+ Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
if (eventMap == null) {
return;
}
- Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
+ Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
if (entries.size() > 0) {
- List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
- for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {
- JSONObject key = entry.getKey();
+ List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
+ for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
+ JSONObject content = entry.getKey().getContent();
+ if (content == null || content.size() == 0) {
+ entriesToRemove.add(entry);
+ continue;
+ }
Boolean result = null;
- for (String s : key.keySet()) {
+ for (String s : content.keySet()) {
if (result == null) {
- result = key.getString(s).equals(hookResponse.getString(s));
+ result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
}else {
- if (key.getString(s) == null) {
+ if (content.getString(s) == null) {
continue;
}
- result = result && key.getString(s).equals(hookResponse.getString(s));
+ result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
}
}
if (null != result && result){
@@ -97,7 +94,7 @@
}
if (!CollectionUtils.isEmpty(entriesToRemove)) {
- for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
+ for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
entries.remove(entry);
}
}
@@ -111,17 +108,25 @@
* @return
*/
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
- // ZLMHttpHookSubscribe.Event event= null;
- Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+ Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
- for (JSONObject key : eventMap.keySet()) {
+ for (IHookSubscribe key : eventMap.keySet()) {
result.add(eventMap.get(key));
}
return result;
}
+ public List<IHookSubscribe> getAll(){
+ ArrayList<IHookSubscribe> result = new ArrayList<>();
+ Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
+ for (Map<IHookSubscribe, Event> value : values) {
+ result.addAll(value.keySet());
+ }
+ return result;
+ }
+
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
index 1bfb730..b24d0a1 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -6,22 +6,22 @@
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
+import java.time.Instant;
import java.util.*;
+import java.util.concurrent.TimeUnit;
@Component
@Order(value=1)
@@ -38,16 +38,10 @@
private ZLMHttpHookSubscribe hookSubscribe;
@Autowired
- private IStreamProxyService streamProxyService;
-
- @Autowired
private EventPublisher publisher;
@Autowired
private IMediaServerService mediaServerService;
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
@Autowired
private MediaConfig mediaConfig;
@@ -67,16 +61,24 @@
mediaServerService.updateToDatabase(mediaSerItem);
}
mediaServerService.syncCatchFromDatabase();
+ HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
+// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
+// hookSubscribeForStreamChange.setExpires(expiresInstant);
// 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁�
- hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
+ hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{
ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
if (zlmServerConfig !=null ) {
if (startGetMedia != null) {
startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
+ if (startGetMedia.size() == 0) {
+ hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+ }
}
}
});
+
+
// 鑾峰彇zlm淇℃伅
logger.info("[zlm] 绛夊緟榛樿zlm涓�...");
@@ -103,7 +105,6 @@
}
startGetMedia = null;
}
- hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
// TODO 娓呯悊鏁版嵁搴撲腑涓巖edis涓嶅尮閰嶇殑zlm
}, 60 * 1000 );
}
@@ -116,6 +117,9 @@
zlmServerConfigFirst.setIp(mediaServerItem.getIp());
zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId());
+ if (startGetMedia.size() == 0) {
+ hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+ }
mediaServerService.zlmServerOnline(zlmServerConfigFirst);
}else {
logger.info("[ {} ]-[ {}:{} ]涓诲姩杩炴帴澶辫触, 娓呯悊鐩稿叧璧勬簮锛� 寮�濮嬪皾璇曢噸璇曡繛鎺�",
@@ -130,6 +134,9 @@
zlmServerConfig.setIp(mediaServerItem.getIp());
zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId());
+ if (startGetMedia.size() == 0) {
+ hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+ }
mediaServerService.zlmServerOnline(zlmServerConfig);
}
}, 2000);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
new file mode 100644
index 0000000..92172f3
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
@@ -0,0 +1,33 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * hook 璁㈤槄宸ュ巶
+ * @author lin
+ */
+public class HookSubscribeFactory {
+
+ public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
+ HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
+ JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
+ subscribeKey.put("app", app);
+ subscribeKey.put("stream", stream);
+ subscribeKey.put("regist", regist);
+ if (scheam != null) {
+ subscribeKey.put("schema", scheam);
+ }
+ subscribeKey.put("mediaServerId", mediaServerId);
+ hookSubscribe.setContent(subscribeKey);
+
+ return hookSubscribe;
+ }
+
+ public static HookSubscribeForServerStarted on_server_started() {
+ HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
+ hookSubscribe.setContent(new JSONObject());
+
+ return hookSubscribe;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
new file mode 100644
index 0000000..0b781e6
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook璁㈤槄-娴佸彉鍖�
+ * @author lin
+ */
+public class HookSubscribeForServerStarted implements IHookSubscribe{
+
+ private HookType hookType = HookType.on_server_started;
+
+ private JSONObject content;
+
+ @JSONField(format="yyyy-MM-dd HH:mm:ss")
+ private Instant expires;
+
+ @Override
+ public HookType getHookType() {
+ return hookType;
+ }
+
+ @Override
+ public JSONObject getContent() {
+ return content;
+ }
+
+ public void setContent(JSONObject content) {
+ this.content = content;
+ }
+
+ @Override
+ public Instant getExpires() {
+ return expires;
+ }
+
+ @Override
+ public void setExpires(Instant expires) {
+ this.expires = expires;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
new file mode 100644
index 0000000..d5b2fb8
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
@@ -0,0 +1,43 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook璁㈤槄-娴佸彉鍖�
+ * @author lin
+ */
+public class HookSubscribeForStreamChange implements IHookSubscribe{
+
+ private HookType hookType = HookType.on_stream_changed;
+
+ private JSONObject content;
+
+ private Instant expires;
+
+ @Override
+ public HookType getHookType() {
+ return hookType;
+ }
+
+ @Override
+ public JSONObject getContent() {
+ return content;
+ }
+
+ public void setContent(JSONObject content) {
+ this.content = content;
+ }
+
+ @Override
+ public Instant getExpires() {
+ return expires;
+ }
+
+ @Override
+ public void setExpires(Instant expires) {
+ this.expires = expires;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
new file mode 100644
index 0000000..797ab81
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
@@ -0,0 +1,23 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+/**
+ * hook绫诲瀷
+ * @author lin
+ */
+
+public enum HookType {
+
+ on_flow_report,
+ on_http_access,
+ on_play,
+ on_publish,
+ on_record_mp4,
+ on_rtsp_auth,
+ on_rtsp_realm,
+ on_shell_login,
+ on_stream_changed,
+ on_stream_none_reader,
+ on_stream_not_found,
+ on_server_started,
+ on_server_keepalive
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
new file mode 100644
index 0000000..5f2ca33
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
@@ -0,0 +1,36 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.time.Instant;
+
+/**
+ * zlm hook浜嬩欢鐨勫弬鏁�
+ * @author lin
+ */
+public interface IHookSubscribe {
+
+ /**
+ * 鑾峰彇hook绫诲瀷
+ * @return hook绫诲瀷
+ */
+ HookType getHookType();
+
+ /**
+ * 鑾峰彇hook鐨勫叿浣撳唴瀹�
+ * @return hook鐨勫叿浣撳唴瀹�
+ */
+ JSONObject getContent();
+
+ /**
+ * 璁剧疆杩囨湡鏃堕棿
+ * @param instant 杩囨湡鏃堕棿
+ */
+ void setExpires(Instant instant);
+
+ /**
+ * 鑾峰彇杩囨湡鏃堕棿
+ * @return 杩囨湡鏃堕棿
+ */
+ Instant getExpires();
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index ddfbc79..93f5dd0 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -13,6 +13,9 @@
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -296,16 +299,10 @@
// 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚�
if (!mediaServerItem.isRtpEnable()) {
// 娣诲姞璁㈤槄
- JSONObject subscribeKey = new JSONObject();
- subscribeKey.put("app", "rtp");
- subscribeKey.put("stream", stream);
- subscribeKey.put("regist", true);
- subscribeKey.put("schema", "rtmp");
- subscribeKey.put("mediaServerId", mediaServerItem.getId());
- subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
- subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
- (MediaServerItem mediaServerItemInUse, JSONObject response)->{
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+ subscribe.removeSubscribe(hookSubscribe);
+ hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook鍝嶅簲
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
index 638ea41..a4fa635 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
@@ -8,6 +8,9 @@
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
@@ -270,14 +273,9 @@
}, userSetting.getPlatformPlayTimeout());
// 娣诲姞璁㈤槄
- JSONObject subscribeKey = new JSONObject();
- subscribeKey.put("app", content.getApp());
- subscribeKey.put("stream", content.getStream());
- subscribeKey.put("regist", true);
- subscribeKey.put("schema", "rtmp");
- subscribeKey.put("mediaServerId", mediaServerItem.getId());
- subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
- (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
+
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
dynamicTask.stop(taskKey);
responseSendItem(mediaServerItem, content, toId, serial);
});
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
index faed2c8..2311d4b 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
@@ -8,6 +8,8 @@
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -38,7 +40,7 @@
public class ServerController {
@Autowired
- private ConfigurableApplicationContext context;
+ private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
@Autowired
private IMediaServerService mediaServerService;
@@ -254,6 +256,18 @@
return result;
}
+ @ApiOperation("鑾峰彇褰撳墠鎵�鏈塰ook")
+ @GetMapping(value = "/hooks")
+ @ResponseBody
+ public WVPResult<List<IHookSubscribe>> getHooks(){
+ WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
+ result.setCode(0);
+ result.setMsg("success");
+ List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
+ result.setData(all);
+ return result;
+ }
+
// @ApiOperation("褰撳墠杩涜涓殑鍔ㄦ�佷换鍔�")
// @GetMapping(value = "/dynamicTask")
// @ResponseBody
--
Gitblit v1.8.0