| | |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; |
| | | import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; |
| | | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; |
| | | import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | |
| | | @Autowired |
| | | private RedisTemplate<Object, Object> redisTemplate; |
| | | |
| | | @Autowired |
| | | private ZlmHttpHookSubscribe hookSubscribe; |
| | | |
| | | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | |
| | | if (userSetting.getServerId().equals(request.getFromId())) { |
| | | return; |
| | | } |
| | | logger.info("[redis-rpc] >> {}", request); |
| | | logger.info("[redis-rpc] << {}", request); |
| | | Method method = getMethod(request.getUri()); |
| | | // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404 |
| | | if (userSetting.getServerId().equals(request.getToId())) { |
| | |
| | | } |
| | | |
| | | private void sendResponse(RedisRpcResponse response){ |
| | | logger.info("[redis-rpc] >> {}", response); |
| | | response.setToId(userSetting.getServerId()); |
| | | RedisRpcMessage message = new RedisRpcMessage(); |
| | | message.setResponse(response); |
| | |
| | | } |
| | | |
| | | private void sendRequest(RedisRpcRequest request){ |
| | | logger.info("[redis-rpc] >> {}", request); |
| | | RedisRpcMessage message = new RedisRpcMessage(); |
| | | message.setRequest(request); |
| | | redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message); |
| | |
| | | public RedisRpcResponse request(RedisRpcRequest request, int timeOut) { |
| | | request.setSn((long) random.nextInt(1000) + 1); |
| | | SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn()); |
| | | |
| | | try { |
| | | sendRequest(request); |
| | | return subscribe.poll(timeOut, TimeUnit.SECONDS); |
| | |
| | | public void removeCallback(long key) { |
| | | callbacks.remove(key); |
| | | } |
| | | |
| | | |
| | | public int getCallbackCount(){ |
| | | return callbacks.size(); |
| | | } |
| | | |
| | | // @Scheduled(fixedRate = 1000) //每1秒执行一次 |
| | | // public void execute(){ |
| | | // logger.info("callbacks的长度: " + callbacks.size()); |
| | | // logger.info("队列的长度: " + topicSubscribers.size()); |
| | | // logger.info("HOOK监听的长度: " + hookSubscribe.size()); |
| | | // logger.info(""); |
| | | // } |
| | | } |