package com.genersoft.iot.vmp.conf.redis;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.genersoft.iot.vmp.common.CommonCallback;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.data.redis.connection.Message;
|
import org.springframework.data.redis.connection.MessageListener;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.Method;
|
import java.util.Map;
|
import java.util.Random;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.TimeUnit;
|
|
@Component
|
public class RedisRpcConfig implements MessageListener {
|
|
private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
|
|
public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
|
|
private final Random random = new Random();
|
|
@Autowired
|
private UserSetting userSetting;
|
|
@Autowired
|
private RedisRpcController redisRpcController;
|
|
@Autowired
|
private RedisTemplate<Object, Object> redisTemplate;
|
|
@Autowired
|
private ZlmHttpHookSubscribe hookSubscribe;
|
|
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
|
@Qualifier("taskExecutor")
|
@Autowired
|
private ThreadPoolTaskExecutor taskExecutor;
|
|
@Override
|
public void onMessage(Message message, byte[] pattern) {
|
boolean isEmpty = taskQueue.isEmpty();
|
taskQueue.offer(message);
|
if (isEmpty) {
|
taskExecutor.execute(() -> {
|
while (!taskQueue.isEmpty()) {
|
Message msg = taskQueue.poll();
|
try {
|
RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
|
if (redisRpcMessage.getRequest() != null) {
|
handlerRequest(redisRpcMessage.getRequest());
|
} else if (redisRpcMessage.getResponse() != null){
|
handlerResponse(redisRpcMessage.getResponse());
|
} else {
|
logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage));
|
}
|
} catch (Exception e) {
|
logger.error("[redis rpc 解析异常] ", e);
|
}
|
}
|
});
|
}
|
}
|
|
private void handlerResponse(RedisRpcResponse response) {
|
if (userSetting.getServerId().equals(response.getToId())) {
|
return;
|
}
|
logger.info("[redis-rpc] << {}", response);
|
response(response);
|
}
|
|
private void handlerRequest(RedisRpcRequest request) {
|
try {
|
if (userSetting.getServerId().equals(request.getFromId())) {
|
return;
|
}
|
logger.info("[redis-rpc] << {}", request);
|
Method method = getMethod(request.getUri());
|
// 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404
|
if (userSetting.getServerId().equals(request.getToId())) {
|
if (method == null) {
|
// 回复404结果
|
RedisRpcResponse response = request.getResponse();
|
response.setStatusCode(404);
|
sendResponse(response);
|
return;
|
}
|
RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
|
if(response != null) {
|
sendResponse(response);
|
}
|
}else {
|
if (method == null) {
|
return;
|
}
|
RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
|
if (response != null) {
|
sendResponse(response);
|
}
|
}
|
}catch (InvocationTargetException | IllegalAccessException e) {
|
logger.error("[redis rpc ] 处理请求失败 ", e);
|
}
|
|
}
|
|
private Method getMethod(String name) {
|
// 启动后扫描所有的路径注解
|
Method[] methods = redisRpcController.getClass().getMethods();
|
for (Method method : methods) {
|
if (method.getName().equals(name)) {
|
return method;
|
}
|
}
|
return null;
|
}
|
|
private void sendResponse(RedisRpcResponse response){
|
logger.info("[redis-rpc] >> {}", response);
|
response.setToId(userSetting.getServerId());
|
RedisRpcMessage message = new RedisRpcMessage();
|
message.setResponse(response);
|
redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
|
}
|
|
private void sendRequest(RedisRpcRequest request){
|
logger.info("[redis-rpc] >> {}", request);
|
RedisRpcMessage message = new RedisRpcMessage();
|
message.setRequest(request);
|
redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
|
}
|
|
|
private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
|
private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
|
|
public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
|
request.setSn((long) random.nextInt(1000) + 1);
|
SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
|
|
try {
|
sendRequest(request);
|
return subscribe.poll(timeOut, TimeUnit.SECONDS);
|
} catch (InterruptedException e) {
|
logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
|
} finally {
|
this.unsubscribe(request.getSn());
|
}
|
return null;
|
}
|
|
public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
|
request.setSn((long) random.nextInt(1000) + 1);
|
setCallback(request.getSn(), callback);
|
sendRequest(request);
|
}
|
|
public Boolean response(RedisRpcResponse response) {
|
SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
|
CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
|
if (queue != null) {
|
try {
|
return queue.offer(response, 2, TimeUnit.SECONDS);
|
} catch (InterruptedException e) {
|
logger.error("{}", e.getMessage(), e);
|
}
|
}else if (callback != null) {
|
callback.run(response);
|
callbacks.remove(response.getSn());
|
}
|
return false;
|
}
|
|
private void unsubscribe(long key) {
|
topicSubscribers.remove(key);
|
}
|
|
|
private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
|
SynchronousQueue<RedisRpcResponse> queue = null;
|
if (!topicSubscribers.containsKey(key))
|
topicSubscribers.put(key, queue = new SynchronousQueue<>());
|
return queue;
|
}
|
|
private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) {
|
// TODO 如果多个上级点播同一个通道会有问题
|
callbacks.put(key, callback);
|
}
|
|
public void removeCallback(long key) {
|
callbacks.remove(key);
|
}
|
|
|
public int getCallbackCount(){
|
return callbacks.size();
|
}
|
|
// @Scheduled(fixedRate = 1000) //每1秒执行一次
|
// public void execute(){
|
// logger.info("callbacks的长度: " + callbacks.size());
|
// logger.info("队列的长度: " + topicSubscribers.size());
|
// logger.info("HOOK监听的长度: " + hookSubscribe.size());
|
// logger.info("");
|
// }
|
}
|