From fd091e545ba174adc36a9d3370e6d4c040ad33fd Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 28 七月 2022 16:18:41 +0800 Subject: [PATCH] 优化hook订阅机制 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java | 85 ++++++----- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java | 33 ++- src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java | 14 - src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 52 ++---- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 12 src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java | 17 +- src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java | 23 +++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java | 43 ++++++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java | 44 ++++++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java | 33 ++++ src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java | 16 ++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java | 36 +++++ 12 files changed, 296 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 0097ce0..feb66b4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -348,25 +351,19 @@ @Override public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { - String streamId = ssrcInfo.getStream(); + String stream = ssrcInfo.getStream(); try { if (device == null) { return; } String streamMode = device.getStreamMode().toUpperCase(); - logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", streamId); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + logger.info("{} 鍒嗛厤鐨刏LM涓�: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (event != null) { event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribe); } }); // @@ -440,7 +437,7 @@ errorEvent.response(e); }), e ->{ // 杩欓噷涓轰緥閬垮厤涓�涓�氶亾鐨勭偣鎾彧鏈変竴涓猚allID杩欎釜鍙傛暟浣跨敤涓�涓浐瀹氬�� - streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); + streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog); okEvent.response(e); }); @@ -530,21 +527,14 @@ CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId()); // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (hookEvent != null) { InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); hookEvent.call(inviteStreamInfo); } + subscribe.removeSubscribe(hookSubscribe); }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); @@ -643,21 +633,15 @@ CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("褰曞儚鍥炴斁娣诲姞璁㈤槄锛岃闃呭唴瀹癸細" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); - subscribeKey.put("regist", false); - subscribeKey.put("schema", "rtmp"); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("regist", false); + hookSubscribe.getContent().put("schema", "rtmp"); // 娣诲姞娴佹敞閿�鐨勮闃咃紝娉ㄩ攢浜嗗悗鍚戣澶囧彂閫乥ye - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{ ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId()); if (transaction != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b1f0fec..18654dd 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -102,7 +102,7 @@ logger.debug("[ ZLM HOOK ] on_server_keepalive API璋冪敤锛屽弬鏁帮細" + json.toString()); } String mediaServerId = json.getString("mediaServerId"); - List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive); + List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); if (subscribes != null && subscribes.size() > 0) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, json); @@ -168,7 +168,7 @@ logger.debug("[ ZLM HOOK ]on_play API璋冪敤锛屽弬鏁帮細" + JSON.toJSONString(param)); } String mediaServerId = param.getMediaServerId(); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -253,7 +253,7 @@ } - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { if (mediaInfo != null) { subscribe.response(mediaInfo, json); @@ -377,7 +377,7 @@ logger.debug("[ ZLM HOOK ]on_shell_login API璋冪敤锛屽弬鏁帮細" + json.toString()); } String mediaServerId = json.getString("mediaServerId"); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -403,7 +403,7 @@ logger.info("[ ZLM HOOK ]on_stream_changed API璋冪敤锛屽弬鏁帮細" + JSONObject.toJSONString(item)); String mediaServerId = item.getMediaServerId(); JSONObject json = (JSONObject) JSON.toJSON(item); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -614,7 +614,7 @@ } String remoteAddr = request.getRemoteAddr(); jsonObject.put("ip", remoteAddr); - List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started); + List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started); if (subscribes != null && subscribes.size() > 0) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, jsonObject); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java index ffd8ec9..a8286a8 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java @@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * @description:閽堝 ZLMediaServer鐨刪ook浜嬩欢璁㈤槄 @@ -16,49 +20,38 @@ @Component public class ZLMHttpHookSubscribe { - public enum HookType{ - on_flow_report, - on_http_access, - on_play, - on_publish, - on_record_mp4, - on_rtsp_auth, - on_rtsp_realm, - on_shell_login, - on_stream_changed, - on_stream_none_reader, - on_stream_not_found, - on_server_started, - on_server_keepalive - } - @FunctionalInterface public interface Event{ void response(MediaServerItem mediaServerItem, JSONObject response); } - private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); + private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); - public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) { - allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event); + public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) { + if (hookSubscribe.getExpires() == null) { + // 榛樿5鍒嗛挓杩囨湡 + Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); + hookSubscribe.setExpires(expiresInstant); + } + allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); } - public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) { + public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { ZLMHttpHookSubscribe.Event event= null; - Map<JSONObject, Event> eventMap = allSubscribes.get(type); + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } - for (JSONObject key : eventMap.keySet()) { + for (IHookSubscribe key : eventMap.keySet()) { Boolean result = null; - for (String s : key.keySet()) { + for (String s : key.getContent().keySet()) { if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); + result = key.getContent().getString(s).equals(hookResponse.getString(s)); }else { - if (key.getString(s) == null) { + if (key.getContent().getString(s) == null) { continue; } - result = result && key.getString(s).equals(hookResponse.getString(s)); + result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); } } @@ -69,26 +62,30 @@ return event; } - public void removeSubscribe(HookType type, JSONObject hookResponse) { - Map<JSONObject, Event> eventMap = allSubscribes.get(type); + public void removeSubscribe(IHookSubscribe hookSubscribe) { + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType()); if (eventMap == null) { return; } - Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet(); + Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet(); if (entries.size() > 0) { - List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>(); - for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) { - JSONObject key = entry.getKey(); + List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>(); + for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) { + JSONObject content = entry.getKey().getContent(); + if (content == null || content.size() == 0) { + entriesToRemove.add(entry); + continue; + } Boolean result = null; - for (String s : key.keySet()) { + for (String s : content.keySet()) { if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); + result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); }else { - if (key.getString(s) == null) { + if (content.getString(s) == null) { continue; } - result = result && key.getString(s).equals(hookResponse.getString(s)); + result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); } } if (null != result && result){ @@ -97,7 +94,7 @@ } if (!CollectionUtils.isEmpty(entriesToRemove)) { - for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) { + for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) { entries.remove(entry); } } @@ -111,17 +108,25 @@ * @return */ public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) { - // ZLMHttpHookSubscribe.Event event= null; - Map<JSONObject, Event> eventMap = allSubscribes.get(type); + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>(); - for (JSONObject key : eventMap.keySet()) { + for (IHookSubscribe key : eventMap.keySet()) { result.add(eventMap.get(key)); } return result; } + public List<IHookSubscribe> getAll(){ + ArrayList<IHookSubscribe> result = new ArrayList<>(); + Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values(); + for (Map<IHookSubscribe, Event> value : values) { + result.addAll(value.keySet()); + } + return result; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 1bfb730..b24d0a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -6,22 +6,22 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import java.time.Instant; import java.util.*; +import java.util.concurrent.TimeUnit; @Component @Order(value=1) @@ -38,16 +38,10 @@ private ZLMHttpHookSubscribe hookSubscribe; @Autowired - private IStreamProxyService streamProxyService; - - @Autowired private EventPublisher publisher; @Autowired private IMediaServerService mediaServerService; - - @Autowired - private IRedisCatchStorage redisCatchStorage; @Autowired private MediaConfig mediaConfig; @@ -67,16 +61,24 @@ mediaServerService.updateToDatabase(mediaSerItem); } mediaServerService.syncCatchFromDatabase(); + HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started(); +// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60)); +// hookSubscribeForStreamChange.setExpires(expiresInstant); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� - hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(), + hookSubscribe.addSubscribe(hookSubscribeForServerStarted, (MediaServerItem mediaServerItem, JSONObject response)->{ ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class); if (zlmServerConfig !=null ) { if (startGetMedia != null) { startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } } } }); + + // 鑾峰彇zlm淇℃伅 logger.info("[zlm] 绛夊緟榛樿zlm涓�..."); @@ -103,7 +105,6 @@ } startGetMedia = null; } - hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject()); // TODO 娓呯悊鏁版嵁搴撲腑涓巖edis涓嶅尮閰嶇殑zlm }, 60 * 1000 ); } @@ -116,6 +117,9 @@ zlmServerConfigFirst.setIp(mediaServerItem.getIp()); zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort()); startGetMedia.remove(mediaServerItem.getId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } mediaServerService.zlmServerOnline(zlmServerConfigFirst); }else { logger.info("[ {} ]-[ {}:{} ]涓诲姩杩炴帴澶辫触, 娓呯悊鐩稿叧璧勬簮锛� 寮�濮嬪皾璇曢噸璇曡繛鎺�", @@ -130,6 +134,9 @@ zlmServerConfig.setIp(mediaServerItem.getIp()); zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); startGetMedia.remove(mediaServerItem.getId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } mediaServerService.zlmServerOnline(zlmServerConfig); } }, 2000); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java new file mode 100644 index 0000000..92172f3 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + + +import com.alibaba.fastjson.JSONObject; + +/** + * hook 璁㈤槄宸ュ巶 + * @author lin + */ +public class HookSubscribeFactory { + + public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) { + HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange(); + JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject(); + subscribeKey.put("app", app); + subscribeKey.put("stream", stream); + subscribeKey.put("regist", regist); + if (scheam != null) { + subscribeKey.put("schema", scheam); + } + subscribeKey.put("mediaServerId", mediaServerId); + hookSubscribe.setContent(subscribeKey); + + return hookSubscribe; + } + + public static HookSubscribeForServerStarted on_server_started() { + HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted(); + hookSubscribe.setContent(new JSONObject()); + + return hookSubscribe; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java new file mode 100644 index 0000000..0b781e6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; + +import java.time.Instant; + +/** + * hook璁㈤槄-娴佸彉鍖� + * @author lin + */ +public class HookSubscribeForServerStarted implements IHookSubscribe{ + + private HookType hookType = HookType.on_server_started; + + private JSONObject content; + + @JSONField(format="yyyy-MM-dd HH:mm:ss") + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java new file mode 100644 index 0000000..d5b2fb8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; + +import java.time.Instant; + +/** + * hook璁㈤槄-娴佸彉鍖� + * @author lin + */ +public class HookSubscribeForStreamChange implements IHookSubscribe{ + + private HookType hookType = HookType.on_stream_changed; + + private JSONObject content; + + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java new file mode 100644 index 0000000..797ab81 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -0,0 +1,23 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +/** + * hook绫诲瀷 + * @author lin + */ + +public enum HookType { + + on_flow_report, + on_http_access, + on_play, + on_publish, + on_record_mp4, + on_rtsp_auth, + on_rtsp_realm, + on_shell_login, + on_stream_changed, + on_stream_none_reader, + on_stream_not_found, + on_server_started, + on_server_keepalive +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java new file mode 100644 index 0000000..5f2ca33 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java @@ -0,0 +1,36 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; + +import java.time.Instant; + +/** + * zlm hook浜嬩欢鐨勫弬鏁� + * @author lin + */ +public interface IHookSubscribe { + + /** + * 鑾峰彇hook绫诲瀷 + * @return hook绫诲瀷 + */ + HookType getHookType(); + + /** + * 鑾峰彇hook鐨勫叿浣撳唴瀹� + * @return hook鐨勫叿浣撳唴瀹� + */ + JSONObject getContent(); + + /** + * 璁剧疆杩囨湡鏃堕棿 + * @param instant 杩囨湡鏃堕棿 + */ + void setExpires(Instant instant); + + /** + * 鑾峰彇杩囨湡鏃堕棿 + * @return 杩囨湡鏃堕棿 + */ + Instant getExpires(); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index ddfbc79..93f5dd0 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -13,6 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -296,16 +299,10 @@ // 鍗曠鍙fā寮弒treamId涔熸湁鍙樺寲锛岄渶瑕侀噸鏂拌缃洃鍚� if (!mediaServerItem.isRtpEnable()) { // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", stream); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey); - subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject response)->{ + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{ logger.info("[ZLM HOOK] ssrc淇鍚庢敹鍒拌闃呮秷鎭細 " + response.toJSONString()); dynamicTask.stop(timeOutTaskKey); // hook鍝嶅簲 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java index 638ea41..a4fa635 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java @@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; @@ -270,14 +273,9 @@ }, userSetting.getPlatformPlayTimeout()); // 娣诲姞璁㈤槄 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", content.getApp()); - subscribeKey.put("stream", content.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId()); + + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index faed2c8..2311d4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.utils.SpringBeanFactory; @@ -38,7 +40,7 @@ public class ServerController { @Autowired - private ConfigurableApplicationContext context; + private ZLMHttpHookSubscribe zlmHttpHookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -254,6 +256,18 @@ return result; } + @ApiOperation("鑾峰彇褰撳墠鎵�鏈塰ook") + @GetMapping(value = "/hooks") + @ResponseBody + public WVPResult<List<IHookSubscribe>> getHooks(){ + WVPResult<List<IHookSubscribe>> result = new WVPResult<>(); + result.setCode(0); + result.setMsg("success"); + List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll(); + result.setData(all); + return result; + } + // @ApiOperation("褰撳墠杩涜涓殑鍔ㄦ�佷换鍔�") // @GetMapping(value = "/dynamicTask") // @ResponseBody -- Gitblit v1.8.0