package com.genersoft.iot.vmp.media.event.hook; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * ZLMediaServer的hook事件订阅 * @author lin */ @Component public class HookSubscribe { private final static Logger logger = LoggerFactory.getLogger(HookSubscribe.class); @FunctionalInterface public interface Event{ void response(MediaServer mediaServerItem, HookParam hookParam); } /** * 流到来的处理 */ @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { for (HookType hookType : allSubscribes.keySet()) { if (hookType.equals(HookType.on_stream_changed)) { } } } private Map> allSubscribes = new ConcurrentHashMap<>(); public void addSubscribe(IHookSubscribe hookSubscribe, HookSubscribe.Event event) { if (hookSubscribe.getExpires() == null) { // 默认5分钟过期 Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); hookSubscribe.setExpires(expiresInstant); } allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); } public HookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { HookSubscribe.Event event= null; Map eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } for (IHookSubscribe key : eventMap.keySet()) { Boolean result = null; for (String s : key.getContent().keySet()) { if (result == null) { result = key.getContent().getString(s).equals(hookResponse.getString(s)); }else { if (key.getContent().getString(s) == null) { continue; } result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); } } if (null != result && result) { event = eventMap.get(key); } } return event; } public void removeSubscribe(IHookSubscribe hookSubscribe) { Map eventMap = allSubscribes.get(hookSubscribe.getHookType()); if (eventMap == null) { return; } Set> entries = eventMap.entrySet(); if (entries.size() > 0) { List> entriesToRemove = new ArrayList<>(); for (Map.Entry entry : entries) { JSONObject content = entry.getKey().getContent(); if (content == null || content.size() == 0) { entriesToRemove.add(entry); continue; } Boolean result = null; for (String s : content.keySet()) { if (result == null) { result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); }else { if (content.getString(s) == null) { continue; } result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); } } if (result){ entriesToRemove.add(entry); } } if (!CollectionUtils.isEmpty(entriesToRemove)) { for (Map.Entry entry : entriesToRemove) { eventMap.remove(entry.getKey()); } if (eventMap.size() == 0) { allSubscribes.remove(hookSubscribe.getHookType()); } } } } /** * 获取某个类型的所有的订阅 * @param type * @return */ public List getSubscribes(HookType type) { Map eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } List result = new ArrayList<>(); for (IHookSubscribe key : eventMap.keySet()) { result.add(eventMap.get(key)); } return result; } public List getAll(){ ArrayList result = new ArrayList<>(); Collection> values = allSubscribes.values(); for (Map value : values) { result.addAll(value.keySet()); } return result; } /** * 对订阅数据进行过期清理 */ // @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 @Scheduled(fixedRate = 2 * 1000) public void execute(){ Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); int total = 0; for (HookType hookType : allSubscribes.keySet()) { Map hookSubscribeEventMap = allSubscribes.get(hookType); if (hookSubscribeEventMap.size() > 0) { for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) { if (hookSubscribe.getExpires().isBefore(instant)) { // 过期的 hookSubscribeEventMap.remove(hookSubscribe); total ++; } } } } } }