| | |
| | | package com.genersoft.iot.vmp.media.event.hook; |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.genersoft.iot.vmp.media.event.MediaArrivalEvent; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.MediaServer; |
| | | import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import com.genersoft.iot.vmp.media.event.media.*; |
| | | import org.springframework.context.event.EventListener; |
| | | import org.springframework.scheduling.annotation.Async; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.CollectionUtils; |
| | | |
| | | import java.time.Instant; |
| | | import java.util.*; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * ZLMediaServer的hook事件订阅 |
| | | * zlm hook事件的参数 |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class HookSubscribe { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(HookSubscribe.class); |
| | | /** |
| | | * 订阅数据过期时间 |
| | | */ |
| | | private final long subscribeExpire = 5 * 60 * 1000; |
| | | |
| | | @FunctionalInterface |
| | | public interface Event{ |
| | | void response(MediaServer mediaServerItem, HookParam hookParam); |
| | | void response(HookData data); |
| | | } |
| | | |
| | | /** |
| | | * 流到来的处理 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @org.springframework.context.event.EventListener |
| | | @EventListener |
| | | public void onApplicationEvent(MediaArrivalEvent event) { |
| | | for (HookType hookType : allSubscribes.keySet()) { |
| | | if (hookType.equals(HookType.on_stream_changed)) { |
| | | |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Map<HookType, Map<IHookSubscribe, HookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); |
| | | |
| | | public void addSubscribe(IHookSubscribe hookSubscribe, HookSubscribe.Event event) { |
| | | if (hookSubscribe.getExpires() == null) { |
| | | // 默认5分钟过期 |
| | | Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); |
| | | hookSubscribe.setExpires(expiresInstant); |
| | | } |
| | | allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); |
| | | } |
| | | |
| | | public HookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { |
| | | HookSubscribe.Event event= null; |
| | | Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); |
| | | if (eventMap == null) { |
| | | return null; |
| | | } |
| | | for (IHookSubscribe key : eventMap.keySet()) { |
| | | Boolean result = null; |
| | | |
| | | for (String s : key.getContent().keySet()) { |
| | | if (result == null) { |
| | | result = key.getContent().getString(s).equals(hookResponse.getString(s)); |
| | | }else { |
| | | if (key.getContent().getString(s) == null) { |
| | | continue; |
| | | } |
| | | result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); |
| | | } |
| | | } |
| | | if (null != result && result) { |
| | | event = eventMap.get(key); |
| | | } |
| | | } |
| | | return event; |
| | | } |
| | | |
| | | public void removeSubscribe(IHookSubscribe hookSubscribe) { |
| | | Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType()); |
| | | if (eventMap == null) { |
| | | return; |
| | | if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { |
| | | sendNotify(HookType.on_media_arrival, event); |
| | | } |
| | | |
| | | Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet(); |
| | | if (entries.size() > 0) { |
| | | List<Map.Entry<IHookSubscribe, HookSubscribe.Event>> entriesToRemove = new ArrayList<>(); |
| | | for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entries) { |
| | | JSONObject content = entry.getKey().getContent(); |
| | | if (content == null || content.size() == 0) { |
| | | entriesToRemove.add(entry); |
| | | continue; |
| | | } |
| | | Boolean result = null; |
| | | for (String s : content.keySet()) { |
| | | if (result == null) { |
| | | result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); |
| | | }else { |
| | | if (content.getString(s) == null) { |
| | | continue; |
| | | } |
| | | result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); |
| | | } |
| | | } |
| | | if (result){ |
| | | entriesToRemove.add(entry); |
| | | } |
| | | } |
| | | |
| | | if (!CollectionUtils.isEmpty(entriesToRemove)) { |
| | | for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entriesToRemove) { |
| | | eventMap.remove(entry.getKey()); |
| | | } |
| | | if (eventMap.size() == 0) { |
| | | allSubscribes.remove(hookSubscribe.getHookType()); |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取某个类型的所有的订阅 |
| | | * @param type |
| | | * @return |
| | | * 流结束事件 |
| | | */ |
| | | public List<HookSubscribe.Event> getSubscribes(HookType type) { |
| | | Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); |
| | | if (eventMap == null) { |
| | | return null; |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaDepartureEvent event) { |
| | | if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { |
| | | sendNotify(HookType.on_media_departure, event); |
| | | } |
| | | List<HookSubscribe.Event> result = new ArrayList<>(); |
| | | for (IHookSubscribe key : eventMap.keySet()) { |
| | | result.add(eventMap.get(key)); |
| | | } |
| | | return result; |
| | | |
| | | } |
| | | /** |
| | | * 推流鉴权事件 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaPublishEvent event) { |
| | | sendNotify(HookType.on_publish, event); |
| | | } |
| | | /** |
| | | * 生成录像文件事件 |
| | | */ |
| | | @Async("taskExecutor") |
| | | @EventListener |
| | | public void onApplicationEvent(MediaRecordMp4Event event) { |
| | | sendNotify(HookType.on_record_mp4, event); |
| | | } |
| | | |
| | | public List<IHookSubscribe> getAll(){ |
| | | ArrayList<IHookSubscribe> result = new ArrayList<>(); |
| | | Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values(); |
| | | for (Map<IHookSubscribe, Event> value : values) { |
| | | result.addAll(value.keySet()); |
| | | private final Map<String, Event> allSubscribes = new ConcurrentHashMap<>(); |
| | | private final Map<String, Hook> allHook = new ConcurrentHashMap<>(); |
| | | |
| | | private void sendNotify(HookType hookType, MediaEvent event) { |
| | | Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId()); |
| | | Event hookSubscribeEvent = allSubscribes.get(paramHook.toString()); |
| | | if (hookSubscribeEvent != null) { |
| | | HookData data = HookData.getInstance(event); |
| | | hookSubscribeEvent.response(data); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | public void addSubscribe(Hook hook, HookSubscribe.Event event) { |
| | | if (hook.getExpireTime() == null) { |
| | | hook.setExpireTime(System.currentTimeMillis() + subscribeExpire); |
| | | } |
| | | allSubscribes.put(hook.toString(), event); |
| | | allHook.put(hook.toString(), hook); |
| | | } |
| | | |
| | | public void removeSubscribe(Hook hook) { |
| | | allSubscribes.remove(hook.toString()); |
| | | allHook.remove(hook.toString()); |
| | | } |
| | | |
| | | /** |
| | | * 对订阅数据进行过期清理 |
| | | */ |
| | | // @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 |
| | | @Scheduled(fixedRate = 2 * 1000) |
| | | @Scheduled(fixedRate=subscribeExpire) //每5分钟执行一次 |
| | | public void execute(){ |
| | | Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); |
| | | int total = 0; |
| | | for (HookType hookType : allSubscribes.keySet()) { |
| | | Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType); |
| | | if (hookSubscribeEventMap.size() > 0) { |
| | | for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) { |
| | | if (hookSubscribe.getExpires().isBefore(instant)) { |
| | | // 过期的 |
| | | hookSubscribeEventMap.remove(hookSubscribe); |
| | | total ++; |
| | | } |
| | | } |
| | | long expireTime = System.currentTimeMillis(); |
| | | for (Hook hook : allHook.values()) { |
| | | if (hook.getExpireTime() < expireTime) { |
| | | allSubscribes.remove(hook.toString()); |
| | | allHook.remove(hook.toString()); |
| | | } |
| | | } |
| | | } |