|  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private ZLMMediaListManager zlmMediaListManager; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private boolean taskQueueHandlerRun = false; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Qualifier("taskExecutor") | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message message, byte[] bytes) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | boolean isEmpty = taskQueue.isEmpty(); | 
|---|
|  |  |  | taskQueue.offer(message); | 
|---|
|  |  |  | if (!taskQueueHandlerRun) { | 
|---|
|  |  |  | taskQueueHandlerRun = true; | 
|---|
|  |  |  | if (isEmpty) { | 
|---|
|  |  |  | taskExecutor.execute(() -> { | 
|---|
|  |  |  | while (!taskQueue.isEmpty()) { | 
|---|
|  |  |  | Message msg = taskQueue.poll(); | 
|---|
|  |  |  | JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); | 
|---|
|  |  |  | if (steamMsgJson == null) { | 
|---|
|  |  |  | logger.warn("[收到redis 流变化]消息解析失败"); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | String serverId = steamMsgJson.getString("serverId"); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); | 
|---|
|  |  |  | if (steamMsgJson == null) { | 
|---|
|  |  |  | logger.warn("[收到redis 流变化]消息解析失败"); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | String serverId = steamMsgJson.getString("serverId"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if (userSetting.getServerId().equals(serverId)) { | 
|---|
|  |  |  | // 自己发送的消息忽略即可 | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | logger.info("[收到redis 流变化]: {}", new String(message.getBody())); | 
|---|
|  |  |  | String app = steamMsgJson.getString("app"); | 
|---|
|  |  |  | String stream = steamMsgJson.getString("stream"); | 
|---|
|  |  |  | boolean register = steamMsgJson.getBoolean("register"); | 
|---|
|  |  |  | String mediaServerId = steamMsgJson.getString("mediaServerId"); | 
|---|
|  |  |  | OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); | 
|---|
|  |  |  | onStreamChangedHookParam.setSeverId(serverId); | 
|---|
|  |  |  | onStreamChangedHookParam.setApp(app); | 
|---|
|  |  |  | onStreamChangedHookParam.setStream(stream); | 
|---|
|  |  |  | onStreamChangedHookParam.setRegist(register); | 
|---|
|  |  |  | onStreamChangedHookParam.setMediaServerId(mediaServerId); | 
|---|
|  |  |  | onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); | 
|---|
|  |  |  | onStreamChangedHookParam.setAliveSecond(0L); | 
|---|
|  |  |  | onStreamChangedHookParam.setTotalReaderCount("0"); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginType(0); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginTypeStr("0"); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginTypeStr("unknown"); | 
|---|
|  |  |  | if (register) { | 
|---|
|  |  |  | zlmMediaListManager.addPush(onStreamChangedHookParam); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | zlmMediaListManager.removeMedia(app, stream); | 
|---|
|  |  |  | if (userSetting.getServerId().equals(serverId)) { | 
|---|
|  |  |  | // 自己发送的消息忽略即可 | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | logger.info("[收到redis 流变化]: {}", new String(message.getBody())); | 
|---|
|  |  |  | String app = steamMsgJson.getString("app"); | 
|---|
|  |  |  | String stream = steamMsgJson.getString("stream"); | 
|---|
|  |  |  | boolean register = steamMsgJson.getBoolean("register"); | 
|---|
|  |  |  | String mediaServerId = steamMsgJson.getString("mediaServerId"); | 
|---|
|  |  |  | OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); | 
|---|
|  |  |  | onStreamChangedHookParam.setSeverId(serverId); | 
|---|
|  |  |  | onStreamChangedHookParam.setApp(app); | 
|---|
|  |  |  | onStreamChangedHookParam.setStream(stream); | 
|---|
|  |  |  | onStreamChangedHookParam.setRegist(register); | 
|---|
|  |  |  | onStreamChangedHookParam.setMediaServerId(mediaServerId); | 
|---|
|  |  |  | onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); | 
|---|
|  |  |  | onStreamChangedHookParam.setAliveSecond(0L); | 
|---|
|  |  |  | onStreamChangedHookParam.setTotalReaderCount("0"); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginType(0); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginTypeStr("0"); | 
|---|
|  |  |  | onStreamChangedHookParam.setOriginTypeStr("unknown"); | 
|---|
|  |  |  | if (register) { | 
|---|
|  |  |  | zlmMediaListManager.addPush(onStreamChangedHookParam); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | zlmMediaListManager.removeMedia(app, stream); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | taskQueueHandlerRun = false; | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|