|  |  | 
 |  |  |  | 
 |  |  | import com.alibaba.fastjson2.JSON; | 
 |  |  | import com.alibaba.fastjson2.JSONObject; | 
 |  |  | import com.genersoft.iot.vmp.common.VideoManagerConstants; | 
 |  |  | import com.genersoft.iot.vmp.conf.DynamicTask; | 
 |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
 |  |  | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; | 
 |  |  | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; | 
 |  |  | import com.genersoft.iot.vmp.service.IMediaServerService; | 
 |  |  | import com.genersoft.iot.vmp.service.bean.*; | 
 |  |  | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 
 |  |  | import com.genersoft.iot.vmp.utils.redis.RedisUtil; | 
 |  |  | import com.genersoft.iot.vmp.vmanager.bean.WVPResult; | 
 |  |  | import org.slf4j.Logger; | 
 |  |  | import org.slf4j.LoggerFactory; | 
 |  |  | 
 |  |  | import org.springframework.beans.factory.annotation.Qualifier; | 
 |  |  | import org.springframework.data.redis.connection.Message; | 
 |  |  | import org.springframework.data.redis.connection.MessageListener; | 
 |  |  | import org.springframework.data.redis.core.RedisTemplate; | 
 |  |  | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | 
 |  |  | import org.springframework.stereotype.Component; | 
 |  |  |  | 
 |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZLMMediaListManager zlmMediaListManager; | 
 |  |  |     private RedisTemplate<Object, Object> redisTemplate; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZLMRTPServerFactory zlmrtpServerFactory; | 
 |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private IMediaServerService mediaServerService; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private IRedisCatchStorage redisCatchStorage; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private DynamicTask dynamicTask; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZLMMediaListManager mediaListManager; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZlmHttpHookSubscribe subscribe; | 
 |  |  |  | 
 |  |  |     private boolean taskQueueHandlerRun = false; | 
 |  |  |  | 
 |  |  |     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |  | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void onMessage(Message message, byte[] bytes) { | 
 |  |  |  | 
 |  |  |         boolean isEmpty = taskQueue.isEmpty(); | 
 |  |  |         taskQueue.offer(message); | 
 |  |  |         if (!taskQueueHandlerRun) { | 
 |  |  |             taskQueueHandlerRun = true; | 
 |  |  |         if (isEmpty) { | 
 |  |  |             taskExecutor.execute(() -> { | 
 |  |  |                 while (!taskQueue.isEmpty()) { | 
 |  |  |                     Message msg = taskQueue.poll(); | 
 |  |  |                     JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                     WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | 
 |  |  |                     if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                         continue; | 
 |  |  |                     } | 
 |  |  |                     if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                         logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                 RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | 
 |  |  |                                 requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | 
 |  |  |                                 requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                     try { | 
 |  |  |                         JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                         WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); | 
 |  |  |                         if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                             continue; | 
 |  |  |                         } | 
 |  |  |                         if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                             logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                     }else { | 
 |  |  |                         logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                     RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                                 WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                         }else { | 
 |  |  |                             logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |  | 
 |  |  |                                 String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (content.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | 
 |  |  |                                         PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(key); | 
 |  |  |                                             try { | 
 |  |  |                                                 playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                             } catch (ParseException e) { | 
 |  |  |                                                 logger.error("[REDIS消息处理异常] ", e); | 
 |  |  |                                    WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |  | 
 |  |  |                                     String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (content.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                            ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); | 
 |  |  |                                             PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(key); | 
 |  |  |                                                 try { | 
 |  |  |                                                     playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                                 } catch (ParseException e) { | 
 |  |  |                                                     logger.error("[REDIS消息处理异常] ", e); | 
 |  |  |                                                 } | 
 |  |  |                                             } | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(key); | 
 |  |  |                                             errorCallback.handler(content); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                                 String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (wvpResult.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                         PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(serial); | 
 |  |  |                                             playMsgCallback.handler(jsonObject); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(serial); | 
 |  |  |                                             errorCallback.handler(wvpResult); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(key); | 
 |  |  |                                                 errorCallback.handler(content); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (wvpResult.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                             JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                             PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(serial); | 
 |  |  |                                                 playMsgCallback.handler(jsonObject); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(serial); | 
 |  |  |                                                 errorCallback.handler(wvpResult); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                         } | 
 |  |  |                     }catch (Exception e) { | 
 |  |  |                         logger.warn("[RedisGbPlayMsg] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); | 
 |  |  |                         logger.error("[RedisGbPlayMsg] 异常内容: ", e); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 taskQueueHandlerRun = false; | 
 |  |  |             }); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |         WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, | 
 |  |  |                 WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); | 
 |  |  |         JSONObject jsonObject = (JSONObject)JSON.toJSON(response); | 
 |  |  |         RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |                     WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); | 
 |  |  |  | 
 |  |  |             JSONObject jsonObject = (JSONObject)JSON.toJSON(response); | 
 |  |  |             RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |             redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |             return; | 
 |  |  |         } | 
 |  |  |         // 确定流是否在线 | 
 |  |  |         boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); | 
 |  |  |         if (streamReady) { | 
 |  |  |         Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); | 
 |  |  |         if (streamReady != null && streamReady) { | 
 |  |  |             logger.info("[回复推流信息]  {}/{}", content.getApp(), content.getStream()); | 
 |  |  |             responseSendItem(mediaServerItem, content, toId, serial); | 
 |  |  |         }else { | 
 |  |  | 
 |  |  |                         userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result | 
 |  |  |                 ); | 
 |  |  |                 JSONObject jsonObject = (JSONObject)JSON.toJSON(response); | 
 |  |  |                 RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |                 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |             }, userSetting.getPlatformPlayTimeout()); | 
 |  |  |  | 
 |  |  |             // 添加订阅 | 
 |  |  |             HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); | 
 |  |  |  | 
 |  |  |             subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ | 
 |  |  |             subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ | 
 |  |  |                         dynamicTask.stop(taskKey); | 
 |  |  |                         responseSendItem(mediaServerItem, content, toId, serial); | 
 |  |  |                     }); | 
 |  |  | 
 |  |  |             MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), | 
 |  |  |                     content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), | 
 |  |  |                     content.getMediaServerId()); | 
 |  |  |             redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); | 
 |  |  |  | 
 |  |  |             String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; | 
 |  |  |             logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); | 
 |  |  |             redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |                 userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result | 
 |  |  |         ); | 
 |  |  |         JSONObject jsonObject = (JSONObject)JSON.toJSON(response); | 
 |  |  |         RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |             wvpResult.setMsg("timeout"); | 
 |  |  |             errorCallback.handler(wvpResult); | 
 |  |  |         }, userSetting.getPlatformPlayTimeout()); | 
 |  |  |         RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |             callbacksForStartSendRtpStream.remove(key); | 
 |  |  |             callbacksForError.remove(key); | 
 |  |  |         }); | 
 |  |  |         RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |         redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); | 
 |  |  |     } | 
 |  |  | } |