From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期二, 23 四月 2024 20:59:20 +0800
Subject: [PATCH] Merge branch 'refs/heads/2.7.0'

---
 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java |  225 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 225 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
new file mode 100644
index 0000000..661e370
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
@@ -0,0 +1,225 @@
+package com.genersoft.iot.vmp.conf.redis;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisRpcConfig implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
+
+    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
+
+    private final Random random = new Random();
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisRpcController redisRpcController;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
+                        if (redisRpcMessage.getRequest() != null) {
+                            handlerRequest(redisRpcMessage.getRequest());
+                        } else if (redisRpcMessage.getResponse() != null){
+                            handlerResponse(redisRpcMessage.getResponse());
+                        } else {
+                            logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage));
+                        }
+                    } catch (Exception e) {
+                        logger.error("[redis rpc 瑙f瀽寮傚父] ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void handlerResponse(RedisRpcResponse response) {
+        if (userSetting.getServerId().equals(response.getToId())) {
+            return;
+        }
+        logger.info("[redis-rpc] << {}", response);
+        response(response);
+    }
+
+    private void handlerRequest(RedisRpcRequest request) {
+        try {
+            if (userSetting.getServerId().equals(request.getFromId())) {
+                return;
+            }
+            logger.info("[redis-rpc] << {}", request);
+            Method method = getMethod(request.getUri());
+            // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404
+            if (userSetting.getServerId().equals(request.getToId())) {
+                if (method == null) {
+                    // 鍥炲404缁撴灉
+                    RedisRpcResponse response = request.getResponse();
+                    response.setStatusCode(404);
+                    sendResponse(response);
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if(response != null) {
+                    sendResponse(response);
+                }
+            }else {
+                if (method == null) {
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if (response != null) {
+                    sendResponse(response);
+                }
+            }
+        }catch (InvocationTargetException | IllegalAccessException e) {
+            logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e);
+        }
+
+    }
+
+    private Method getMethod(String name) {
+        // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ
+        Method[] methods = redisRpcController.getClass().getMethods();
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        logger.info("[redis-rpc] >> {}", response);
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+    private void sendRequest(RedisRpcRequest request){
+        logger.info("[redis-rpc] >> {}", request);
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setRequest(request);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+
+    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
+    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
+
+    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
+
+        try {
+            sendRequest(request);
+            return subscribe.poll(timeOut, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
+        } finally {
+            this.unsubscribe(request.getSn());
+        }
+        return null;
+    }
+
+    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        setCallback(request.getSn(), callback);
+        sendRequest(request);
+    }
+
+    public Boolean response(RedisRpcResponse response) {
+        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
+        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
+        if (queue != null) {
+            try {
+                return queue.offer(response, 2, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                logger.error("{}", e.getMessage(), e);
+            }
+        }else if (callback != null) {
+            callback.run(response);
+            callbacks.remove(response.getSn());
+        }
+        return false;
+    }
+
+    private void unsubscribe(long key) {
+        topicSubscribers.remove(key);
+    }
+
+
+    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
+        SynchronousQueue<RedisRpcResponse> queue = null;
+        if (!topicSubscribers.containsKey(key))
+            topicSubscribers.put(key, queue = new SynchronousQueue<>());
+        return queue;
+    }
+
+    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
+        // TODO 濡傛灉澶氫釜涓婄骇鐐规挱鍚屼竴涓�氶亾浼氭湁闂
+        callbacks.put(key, callback);
+    }
+
+    public void removeCallback(long key)  {
+        callbacks.remove(key);
+    }
+
+
+    public int getCallbackCount(){
+        return callbacks.size();
+    }
+
+//    @Scheduled(fixedRate = 1000)   //姣�1绉掓墽琛屼竴娆�
+//    public void execute(){
+//        logger.info("callbacks鐨勯暱搴�: " + callbacks.size());
+//        logger.info("闃熷垪鐨勯暱搴�: " + topicSubscribers.size());
+//        logger.info("HOOK鐩戝惉鐨勯暱搴�: " + hookSubscribe.size());
+//        logger.info("");
+//    }
+}

--
Gitblit v1.8.0