From 4548695a0b79cc6a3cc940d698cdf1d0b535d570 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期日, 31 三月 2024 00:28:45 +0800 Subject: [PATCH] hook优化 --- src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java | 189 +++++++++++++++-------------------------------- 1 files changed, 61 insertions(+), 128 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index f5dac01..64b8ffe 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -1,172 +1,105 @@ package com.genersoft.iot.vmp.media.event.hook; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaEvent; +import com.genersoft.iot.vmp.media.event.media.MediaPublishEvent; +import org.mybatis.logging.Logger; +import org.mybatis.logging.LoggerFactory; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import java.time.Instant; -import java.util.*; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; /** - * ZLMediaServer鐨刪ook浜嬩欢璁㈤槄 + * zlm hook浜嬩欢鐨勫弬鏁� * @author lin */ @Component public class HookSubscribe { - private final static Logger logger = LoggerFactory.getLogger(HookSubscribe.class); + /** + * 璁㈤槄鏁版嵁杩囨湡鏃堕棿 + */ + private final long subscribeExpire = 5 * 1000; @FunctionalInterface public interface Event{ - void response(MediaServer mediaServerItem, HookParam hookParam); + void response(HookData data); } /** * 娴佸埌鏉ョ殑澶勭悊 */ @Async("taskExecutor") - @org.springframework.context.event.EventListener + @EventListener public void onApplicationEvent(MediaArrivalEvent event) { - for (HookType hookType : allSubscribes.keySet()) { - if (hookType.equals(HookType.on_stream_changed)) { - - } - } - } - - private Map<HookType, Map<IHookSubscribe, HookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); - - public void addSubscribe(IHookSubscribe hookSubscribe, HookSubscribe.Event event) { - if (hookSubscribe.getExpires() == null) { - // 榛樿5鍒嗛挓杩囨湡 - Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); - hookSubscribe.setExpires(expiresInstant); - } - allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); - } - - public HookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { - HookSubscribe.Event event= null; - Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); - if (eventMap == null) { - return null; - } - for (IHookSubscribe key : eventMap.keySet()) { - Boolean result = null; - - for (String s : key.getContent().keySet()) { - if (result == null) { - result = key.getContent().getString(s).equals(hookResponse.getString(s)); - }else { - if (key.getContent().getString(s) == null) { - continue; - } - result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); - } - } - if (null != result && result) { - event = eventMap.get(key); - } - } - return event; - } - - public void removeSubscribe(IHookSubscribe hookSubscribe) { - Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType()); - if (eventMap == null) { - return; + if ("rtsp".equals(event.getSchema())) { + System.out.println("娴佸埌鏉ョ殑澶勭悊: " + allSubscribes.size()); + sendNotify(HookType.on_media_arrival, event); } - Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet(); - if (entries.size() > 0) { - List<Map.Entry<IHookSubscribe, HookSubscribe.Event>> entriesToRemove = new ArrayList<>(); - for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entries) { - JSONObject content = entry.getKey().getContent(); - if (content == null || content.size() == 0) { - entriesToRemove.add(entry); - continue; - } - Boolean result = null; - for (String s : content.keySet()) { - if (result == null) { - result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); - }else { - if (content.getString(s) == null) { - continue; - } - result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); - } - } - if (result){ - entriesToRemove.add(entry); - } - } - - if (!CollectionUtils.isEmpty(entriesToRemove)) { - for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entriesToRemove) { - eventMap.remove(entry.getKey()); - } - if (eventMap.size() == 0) { - allSubscribes.remove(hookSubscribe.getHookType()); - } - } - - } } /** - * 鑾峰彇鏌愪釜绫诲瀷鐨勬墍鏈夌殑璁㈤槄 - * @param type - * @return + * 娴佺粨鏉熶簨浠� */ - public List<HookSubscribe.Event> getSubscribes(HookType type) { - Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); - if (eventMap == null) { - return null; + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + if ("rtsp".equals(event.getSchema())) { + sendNotify(HookType.on_media_departure, event); } - List<HookSubscribe.Event> result = new ArrayList<>(); - for (IHookSubscribe key : eventMap.keySet()) { - result.add(eventMap.get(key)); - } - return result; + + } + /** + * 鎺ㄦ祦閴存潈浜嬩欢 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaPublishEvent event) { + sendNotify(HookType.on_publish, event); } - public List<IHookSubscribe> getAll(){ - ArrayList<IHookSubscribe> result = new ArrayList<>(); - Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values(); - for (Map<IHookSubscribe, Event> value : values) { - result.addAll(value.keySet()); + private final Map<String, Event> allSubscribes = new ConcurrentHashMap<>(); + private final Map<String, Hook> allHook = new ConcurrentHashMap<>(); + + private void sendNotify(HookType hookType, MediaEvent event) { + Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId()); + Event hookSubscribeEvent = allSubscribes.get(paramHook.toString()); + if (hookSubscribeEvent != null) { + HookData data = HookData.getInstance(event); + hookSubscribeEvent.response(data); } - return result; + } + + public void addSubscribe(Hook hook, HookSubscribe.Event event) { + if (hook.getCreateTime() == null) { + hook.setCreateTime(System.currentTimeMillis()); + } + allSubscribes.put(hook.toString(), event); + allHook.put(hook.toString(), hook); + } + + public void removeSubscribe(Hook hook) { + allSubscribes.remove(hook.toString()); + allHook.remove(hook.toString()); } /** * 瀵硅闃呮暟鎹繘琛岃繃鏈熸竻鐞� */ -// @Scheduled(cron="0 0/5 * * * ?") //姣�5鍒嗛挓鎵ц涓�娆� - @Scheduled(fixedRate = 2 * 1000) + @Scheduled(fixedRate=subscribeExpire) //姣�5鍒嗛挓鎵ц涓�娆� public void execute(){ - Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); - int total = 0; - for (HookType hookType : allSubscribes.keySet()) { - Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType); - if (hookSubscribeEventMap.size() > 0) { - for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) { - if (hookSubscribe.getExpires().isBefore(instant)) { - // 杩囨湡鐨� - hookSubscribeEventMap.remove(hookSubscribe); - total ++; - } - } + long expireTime = System.currentTimeMillis() - subscribeExpire; + for (Hook hook : allHook.values()) { + if (hook.getCreateTime() < expireTime) { + allSubscribes.remove(hook.toString()); + allHook.remove(hook.toString()); } } } -- Gitblit v1.8.0