From d41d6b34af2485198ed01e1888db1571e4da1a6a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 23 四月 2024 20:59:20 +0800 Subject: [PATCH] Merge branch 'refs/heads/2.7.0' --- src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java | 225 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 225 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java new file mode 100644 index 0000000..661e370 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -0,0 +1,225 @@ +package com.genersoft.iot.vmp.conf.redis; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisRpcConfig implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class); + + public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY"; + + private final Random random = new Random(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisRpcController redisRpcController; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(Message message, byte[] pattern) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class); + if (redisRpcMessage.getRequest() != null) { + handlerRequest(redisRpcMessage.getRequest()); + } else if (redisRpcMessage.getResponse() != null){ + handlerResponse(redisRpcMessage.getResponse()); + } else { + logger.error("[redis rpc 瑙f瀽澶辫触] {}", JSON.toJSONString(redisRpcMessage)); + } + } catch (Exception e) { + logger.error("[redis rpc 瑙f瀽寮傚父] ", e); + } + } + }); + } + } + + private void handlerResponse(RedisRpcResponse response) { + if (userSetting.getServerId().equals(response.getToId())) { + return; + } + logger.info("[redis-rpc] << {}", response); + response(response); + } + + private void handlerRequest(RedisRpcRequest request) { + try { + if (userSetting.getServerId().equals(request.getFromId())) { + return; + } + logger.info("[redis-rpc] << {}", request); + Method method = getMethod(request.getUri()); + // 娌℃湁鎼哄甫鐩爣ID鐨勫彲浠ョ悊瑙d负鍝釜wvp鏈夌粨鏋滃氨鍝釜鍥炲锛屾惡甯︾洰鏍嘔D锛屼絾鏄鏋滄槸涓嶅瓨鍦ㄧ殑uri鍒欑洿鎺ュ洖澶�404 + if (userSetting.getServerId().equals(request.getToId())) { + if (method == null) { + // 鍥炲404缁撴灉 + RedisRpcResponse response = request.getResponse(); + response.setStatusCode(404); + sendResponse(response); + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if(response != null) { + sendResponse(response); + } + }else { + if (method == null) { + return; + } + RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request); + if (response != null) { + sendResponse(response); + } + } + }catch (InvocationTargetException | IllegalAccessException e) { + logger.error("[redis rpc ] 澶勭悊璇锋眰澶辫触 ", e); + } + + } + + private Method getMethod(String name) { + // 鍚姩鍚庢壂鎻忔墍鏈夌殑璺緞娉ㄨВ + Method[] methods = redisRpcController.getClass().getMethods(); + for (Method method : methods) { + if (method.getName().equals(name)) { + return method; + } + } + return null; + } + + private void sendResponse(RedisRpcResponse response){ + logger.info("[redis-rpc] >> {}", response); + response.setToId(userSetting.getServerId()); + RedisRpcMessage message = new RedisRpcMessage(); + message.setResponse(response); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + private void sendRequest(RedisRpcRequest request){ + logger.info("[redis-rpc] >> {}", request); + RedisRpcMessage message = new RedisRpcMessage(); + message.setRequest(request); + redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); + } + + + private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>(); + private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>(); + + public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { + request.setSn((long) random.nextInt(1000) + 1); + SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn()); + + try { + sendRequest(request); + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); + } finally { + this.unsubscribe(request.getSn()); + } + return null; + } + + public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) { + request.setSn((long) random.nextInt(1000) + 1); + setCallback(request.getSn(), callback); + sendRequest(request); + } + + public Boolean response(RedisRpcResponse response) { + SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn()); + CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn()); + if (queue != null) { + try { + return queue.offer(response, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("{}", e.getMessage(), e); + } + }else if (callback != null) { + callback.run(response); + callbacks.remove(response.getSn()); + } + return false; + } + + private void unsubscribe(long key) { + topicSubscribers.remove(key); + } + + + private SynchronousQueue<RedisRpcResponse> subscribe(long key) { + SynchronousQueue<RedisRpcResponse> queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<>()); + return queue; + } + + private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) { + // TODO 濡傛灉澶氫釜涓婄骇鐐规挱鍚屼竴涓�氶亾浼氭湁闂 + callbacks.put(key, callback); + } + + public void removeCallback(long key) { + callbacks.remove(key); + } + + + public int getCallbackCount(){ + return callbacks.size(); + } + +// @Scheduled(fixedRate = 1000) //姣�1绉掓墽琛屼竴娆� +// public void execute(){ +// logger.info("callbacks鐨勯暱搴�: " + callbacks.size()); +// logger.info("闃熷垪鐨勯暱搴�: " + topicSubscribers.size()); +// logger.info("HOOK鐩戝惉鐨勯暱搴�: " + hookSubscribe.size()); +// logger.info(""); +// } +} -- Gitblit v1.8.0