| | |
| | | @Override |
| | | public void onMessage(Message message, byte[] bytes) { |
| | | // TODO 增加队列 |
| | | logger.warn("[REDIS 消息-推流设备状态变化]: {}", new String(message.getBody())); |
| | | logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); |
| | | taskQueue.offer(message); |
| | | |
| | | if (!taskQueueHandlerRun) { |
| | |
| | | Message msg = taskQueue.poll(); |
| | | PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); |
| | | if (statusChangeFromPushStream == null) { |
| | | logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); |
| | | logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); |
| | | return; |
| | | } |
| | | // 取消定时任务 |
| | |
| | | // 启动时设置所有推流通道离线,发起查询请求 |
| | | redisCatchStorage.sendStreamPushRequestedMsgForStatus(); |
| | | dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ |
| | | logger.info("[REDIS 消息]未收到redis回复推流设备状态,执行推流设备离线"); |
| | | logger.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线"); |
| | | // 五秒收不到请求就设置通道离线,然后通知上级离线 |
| | | streamPushService.allStreamOffline(); |
| | | }, 5000); |