|  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZlmHttpHookSubscribe subscribe; | 
 |  |  |  | 
 |  |  |     private boolean taskQueueHandlerRun = false; | 
 |  |  |  | 
 |  |  |     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |  | 
 |  |  |     @Qualifier("taskExecutor") | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void onMessage(Message message, byte[] bytes) { | 
 |  |  |  | 
 |  |  |         boolean isEmpty = taskQueue.isEmpty(); | 
 |  |  |         taskQueue.offer(message); | 
 |  |  |         if (!taskQueueHandlerRun) { | 
 |  |  |             taskQueueHandlerRun = true; | 
 |  |  |         if (isEmpty) { | 
 |  |  |             taskExecutor.execute(() -> { | 
 |  |  |                 while (!taskQueue.isEmpty()) { | 
 |  |  |                     Message msg = taskQueue.poll(); | 
 |  |  |                     JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                     WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | 
 |  |  |                     if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                         continue; | 
 |  |  |                     } | 
 |  |  |                     if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                         logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                 RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | 
 |  |  |                                 requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | 
 |  |  |                                 requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                     try { | 
 |  |  |                         JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                         WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); | 
 |  |  |                         if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                             continue; | 
 |  |  |                         } | 
 |  |  |                         if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                             logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                     }else { | 
 |  |  |                         logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                     RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                                 WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                         }else { | 
 |  |  |                             logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |  | 
 |  |  |                                 String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (content.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | 
 |  |  |                                         PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(key); | 
 |  |  |                                             try { | 
 |  |  |                                                 playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                             } catch (ParseException e) { | 
 |  |  |                                                 logger.error("[REDIS消息处理异常] ", e); | 
 |  |  |                                    WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |  | 
 |  |  |                                     String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (content.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                            ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); | 
 |  |  |                                             PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(key); | 
 |  |  |                                                 try { | 
 |  |  |                                                     playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                                 } catch (ParseException e) { | 
 |  |  |                                                     logger.error("[REDIS消息处理异常] ", e); | 
 |  |  |                                                 } | 
 |  |  |                                             } | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(key); | 
 |  |  |                                             errorCallback.handler(content); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                                 String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (wvpResult.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                         PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(serial); | 
 |  |  |                                             playMsgCallback.handler(jsonObject); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(serial); | 
 |  |  |                                             errorCallback.handler(wvpResult); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(key); | 
 |  |  |                                                 errorCallback.handler(content); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (wvpResult.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                             JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                             PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(serial); | 
 |  |  |                                                 playMsgCallback.handler(jsonObject); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(serial); | 
 |  |  |                                                 errorCallback.handler(wvpResult); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                         } | 
 |  |  |                     }catch (Exception e) { | 
 |  |  |                         logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage()); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 taskQueueHandlerRun = false; | 
 |  |  |             }); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** |