From 4548695a0b79cc6a3cc940d698cdf1d0b535d570 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期日, 31 三月 2024 00:28:45 +0800
Subject: [PATCH] hook优化

---
 src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java |  189 +++++++++++++++--------------------------------
 1 files changed, 61 insertions(+), 128 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
index f5dac01..64b8ffe 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
@@ -1,172 +1,105 @@
 package com.genersoft.iot.vmp.media.event.hook;
 
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaEvent;
+import com.genersoft.iot.vmp.media.event.media.MediaPublishEvent;
+import org.mybatis.logging.Logger;
+import org.mybatis.logging.LoggerFactory;
+import org.springframework.context.event.EventListener;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
 
 import java.time.Instant;
-import java.util.*;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
- * ZLMediaServer鐨刪ook浜嬩欢璁㈤槄
+ * zlm hook浜嬩欢鐨勫弬鏁�
  * @author lin
  */
 @Component
 public class HookSubscribe {
 
-    private final static Logger logger = LoggerFactory.getLogger(HookSubscribe.class);
+    /**
+     * 璁㈤槄鏁版嵁杩囨湡鏃堕棿
+     */
+    private final long subscribeExpire = 5 * 1000;
 
     @FunctionalInterface
     public interface Event{
-        void response(MediaServer mediaServerItem, HookParam hookParam);
+        void response(HookData data);
     }
 
     /**
      * 娴佸埌鏉ョ殑澶勭悊
      */
     @Async("taskExecutor")
-    @org.springframework.context.event.EventListener
+    @EventListener
     public void onApplicationEvent(MediaArrivalEvent event) {
-        for (HookType hookType : allSubscribes.keySet()) {
-            if (hookType.equals(HookType.on_stream_changed)) {
-
-            }
-        }
-    }
-
-    private Map<HookType, Map<IHookSubscribe, HookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
-
-    public void addSubscribe(IHookSubscribe hookSubscribe, HookSubscribe.Event event) {
-        if (hookSubscribe.getExpires() == null) {
-            // 榛樿5鍒嗛挓杩囨湡
-            Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
-            hookSubscribe.setExpires(expiresInstant);
-        }
-        allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
-    }
-
-    public HookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
-        HookSubscribe.Event event= null;
-        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
-        if (eventMap == null) {
-            return null;
-        }
-        for (IHookSubscribe key : eventMap.keySet()) {
-            Boolean result = null;
-
-            for (String s : key.getContent().keySet()) {
-                if (result == null) {
-                    result = key.getContent().getString(s).equals(hookResponse.getString(s));
-                }else {
-                    if (key.getContent().getString(s) == null) {
-                        continue;
-                    }
-                    result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
-                }
-            }
-            if (null != result && result) {
-                event = eventMap.get(key);
-            }
-        }
-        return event;
-    }
-
-    public void removeSubscribe(IHookSubscribe hookSubscribe) {
-        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
-        if (eventMap == null) {
-            return;
+        if ("rtsp".equals(event.getSchema())) {
+            System.out.println("娴佸埌鏉ョ殑澶勭悊: " + allSubscribes.size());
+            sendNotify(HookType.on_media_arrival, event);
         }
 
-        Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
-        if (entries.size() > 0) {
-            List<Map.Entry<IHookSubscribe, HookSubscribe.Event>> entriesToRemove = new ArrayList<>();
-            for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entries) {
-                JSONObject content = entry.getKey().getContent();
-                if (content == null || content.size() == 0) {
-                    entriesToRemove.add(entry);
-                    continue;
-                }
-                Boolean result = null;
-                for (String s : content.keySet()) {
-                    if (result == null) {
-                        result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
-                    }else {
-                        if (content.getString(s) == null) {
-                            continue;
-                        }
-                        result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
-                    }
-                }
-                if (result){
-                    entriesToRemove.add(entry);
-                }
-            }
-
-            if (!CollectionUtils.isEmpty(entriesToRemove)) {
-                for (Map.Entry<IHookSubscribe, HookSubscribe.Event> entry : entriesToRemove) {
-                    eventMap.remove(entry.getKey());
-                }
-                if (eventMap.size() == 0) {
-                    allSubscribes.remove(hookSubscribe.getHookType());
-                }
-            }
-
-        }
     }
 
     /**
-     * 鑾峰彇鏌愪釜绫诲瀷鐨勬墍鏈夌殑璁㈤槄
-     * @param type
-     * @return
+     * 娴佺粨鏉熶簨浠�
      */
-    public List<HookSubscribe.Event> getSubscribes(HookType type) {
-        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
-        if (eventMap == null) {
-            return null;
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("rtsp".equals(event.getSchema())) {
+            sendNotify(HookType.on_media_departure, event);
         }
-        List<HookSubscribe.Event> result = new ArrayList<>();
-        for (IHookSubscribe key : eventMap.keySet()) {
-            result.add(eventMap.get(key));
-        }
-        return result;
+
+    }
+    /**
+     * 鎺ㄦ祦閴存潈浜嬩欢
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaPublishEvent event) {
+        sendNotify(HookType.on_publish, event);
     }
 
-    public List<IHookSubscribe> getAll(){
-        ArrayList<IHookSubscribe> result = new ArrayList<>();
-        Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
-        for (Map<IHookSubscribe, Event> value : values) {
-            result.addAll(value.keySet());
+    private final Map<String, Event> allSubscribes = new ConcurrentHashMap<>();
+    private final Map<String, Hook> allHook = new ConcurrentHashMap<>();
+
+    private void sendNotify(HookType hookType, MediaEvent event) {
+        Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId());
+        Event hookSubscribeEvent = allSubscribes.get(paramHook.toString());
+        if (hookSubscribeEvent != null) {
+            HookData data = HookData.getInstance(event);
+            hookSubscribeEvent.response(data);
         }
-        return result;
+    }
+
+    public void addSubscribe(Hook hook, HookSubscribe.Event event) {
+        if (hook.getCreateTime() == null) {
+            hook.setCreateTime(System.currentTimeMillis());
+        }
+        allSubscribes.put(hook.toString(), event);
+        allHook.put(hook.toString(), hook);
+    }
+
+    public void removeSubscribe(Hook hook) {
+        allSubscribes.remove(hook.toString());
+        allHook.remove(hook.toString());
     }
 
     /**
      * 瀵硅闃呮暟鎹繘琛岃繃鏈熸竻鐞�
      */
-//    @Scheduled(cron="0 0/5 * * * ?")   //姣�5鍒嗛挓鎵ц涓�娆�
-    @Scheduled(fixedRate = 2 * 1000)
+    @Scheduled(fixedRate=subscribeExpire)   //姣�5鍒嗛挓鎵ц涓�娆�
     public void execute(){
-        Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
-        int total = 0;
-        for (HookType hookType : allSubscribes.keySet()) {
-            Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
-            if (hookSubscribeEventMap.size() > 0) {
-                for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
-                    if (hookSubscribe.getExpires().isBefore(instant)) {
-                        // 杩囨湡鐨�
-                        hookSubscribeEventMap.remove(hookSubscribe);
-                        total ++;
-                    }
-                }
+        long expireTime = System.currentTimeMillis() - subscribeExpire;
+        for (Hook hook : allHook.values()) {
+            if (hook.getCreateTime() < expireTime) {
+                allSubscribes.remove(hook.toString());
+                allHook.remove(hook.toString());
             }
         }
     }

--
Gitblit v1.8.0