|  |  | 
 |  |  | package com.genersoft.iot.vmp.service.redisMsg; | 
 |  |  |  | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.alibaba.fastjson.JSONObject; | 
 |  |  | import com.alibaba.fastjson2.JSON; | 
 |  |  | import com.alibaba.fastjson2.JSONObject; | 
 |  |  | import com.genersoft.iot.vmp.conf.DynamicTask; | 
 |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
 |  |  | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; | 
 |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private ZlmHttpHookSubscribe subscribe; | 
 |  |  |  | 
 |  |  |     private boolean taskQueueHandlerRun = false; | 
 |  |  |  | 
 |  |  |     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |  | 
 |  |  |     @Qualifier("taskExecutor") | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void onMessage(Message message, byte[] bytes) { | 
 |  |  |  | 
 |  |  |         boolean isEmpty = taskQueue.isEmpty(); | 
 |  |  |         taskQueue.offer(message); | 
 |  |  |         if (!taskQueueHandlerRun) { | 
 |  |  |             taskQueueHandlerRun = true; | 
 |  |  |         if (isEmpty) { | 
 |  |  |             taskExecutor.execute(() -> { | 
 |  |  |                 while (!taskQueue.isEmpty()) { | 
 |  |  |                     Message msg = taskQueue.poll(); | 
 |  |  |                     JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                     WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | 
 |  |  |                     if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                         continue; | 
 |  |  |                     } | 
 |  |  |                     if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                         logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                 RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | 
 |  |  |                                 requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | 
 |  |  |                                 requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                     try { | 
 |  |  |                         JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
 |  |  |                         WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); | 
 |  |  |                         if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
 |  |  |                             continue; | 
 |  |  |                         } | 
 |  |  |                         if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
 |  |  |                             logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
 |  |  |  | 
 |  |  |                     }else { | 
 |  |  |                         logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                         switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                             case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |                                     RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                                 WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                         }else { | 
 |  |  |                             logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
 |  |  |                             switch (wvpRedisMsg.getCmd()){ | 
 |  |  |                                 case WvpRedisMsgCmd.GET_SEND_ITEM: | 
 |  |  |  | 
 |  |  |                                 String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (content.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | 
 |  |  |                                         PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(key); | 
 |  |  |                                             try { | 
 |  |  |                                                 playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                             } catch (ParseException e) { | 
 |  |  |                                                 throw new RuntimeException(e); | 
 |  |  |                                    WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |  | 
 |  |  |                                     String key = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (content.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                            ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); | 
 |  |  |                                             PlayMsgCallback playMsgCallback = callbacks.get(key); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(key); | 
 |  |  |                                                 try { | 
 |  |  |                                                     playMsgCallback.handler(responseSendItemMsg); | 
 |  |  |                                                 } catch (ParseException e) { | 
 |  |  |                                                     logger.error("[REDIS消息处理异常] ", e); | 
 |  |  |                                                 } | 
 |  |  |                                             } | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(key); | 
 |  |  |                                             errorCallback.handler(content); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                 WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
 |  |  |                                 String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                 switch (wvpResult.getCode()) { | 
 |  |  |                                     case 0: | 
 |  |  |                                         JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                         PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                         if (playMsgCallback != null) { | 
 |  |  |                                             callbacksForError.remove(serial); | 
 |  |  |                                             playMsgCallback.handler(jsonObject); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                     case ERROR_CODE_OFFLINE: | 
 |  |  |                                     case ERROR_CODE_TIMEOUT: | 
 |  |  |                                         PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                         if (errorCallback != null) { | 
 |  |  |                                             callbacks.remove(serial); | 
 |  |  |                                             errorCallback.handler(wvpResult); | 
 |  |  |                                         } | 
 |  |  |                                         break; | 
 |  |  |                                     default: | 
 |  |  |                                         break; | 
 |  |  |                                 } | 
 |  |  |                                 break; | 
 |  |  |                             default: | 
 |  |  |                                 break; | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(key); | 
 |  |  |                                                 errorCallback.handler(content); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
 |  |  |                                     WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
 |  |  |                                     String serial = wvpRedisMsg.getSerial(); | 
 |  |  |                                     switch (wvpResult.getCode()) { | 
 |  |  |                                         case 0: | 
 |  |  |                                             JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
 |  |  |                                             PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
 |  |  |                                             if (playMsgCallback != null) { | 
 |  |  |                                                 callbacksForError.remove(serial); | 
 |  |  |                                                 playMsgCallback.handler(jsonObject); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
 |  |  |                                         case ERROR_CODE_OFFLINE: | 
 |  |  |                                         case ERROR_CODE_TIMEOUT: | 
 |  |  |                                             PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
 |  |  |                                             if (errorCallback != null) { | 
 |  |  |                                                 callbacks.remove(serial); | 
 |  |  |                                                 errorCallback.handler(wvpResult); | 
 |  |  |                                             } | 
 |  |  |                                             break; | 
 |  |  |                                         default: | 
 |  |  |                                             break; | 
 |  |  |                                     } | 
 |  |  |                                     break; | 
 |  |  |                                 default: | 
 |  |  |                                     break; | 
 |  |  |                             } | 
 |  |  |  | 
 |  |  |                         } | 
 |  |  |                     }catch (Exception e) { | 
 |  |  |                         logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage()); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 taskQueueHandlerRun = false; | 
 |  |  |             }); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |         SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), | 
 |  |  |                 content.getPort(), content.getSsrc(), content.getPlatformId(), | 
 |  |  |                 content.getApp(), content.getStream(), content.getChannelId(), | 
 |  |  |                 content.getTcp()); | 
 |  |  |                 content.getTcp(), content.getRtcp()); | 
 |  |  |  | 
 |  |  |         WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); | 
 |  |  |         result.setCode(0); | 
 |  |  | 
 |  |  |      * @param callback 得到信息的回调 | 
 |  |  |      */ | 
 |  |  |     public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, | 
 |  |  |                         String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { | 
 |  |  |                         String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { | 
 |  |  |         RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( | 
 |  |  |                 serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); | 
 |  |  |                 serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); | 
 |  |  |         requestSendItemMsg.setServerId(serverId); | 
 |  |  |         String key = UUID.randomUUID().toString(); | 
 |  |  |         WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, |