|  |  | 
 |  |  | package com.genersoft.iot.vmp.service.redisMsg; | 
 |  |  |  | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | 
 |  |  | import com.alibaba.fastjson2.JSON; | 
 |  |  | import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; | 
 |  |  | import org.slf4j.Logger; | 
 |  |  | import org.slf4j.LoggerFactory; | 
 |  |  | 
 |  |  |  | 
 |  |  |     private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); | 
 |  |  |  | 
 |  |  |     private boolean taskQueueHandlerRun = false; | 
 |  |  |  | 
 |  |  |     private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
 |  |  |  | 
 |  |  |     @Qualifier("taskExecutor") | 
 |  |  |     @Autowired | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void onMessage(Message message, byte[] bytes) { | 
 |  |  |         logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | 
 |  |  |         logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | 
 |  |  |         boolean isEmpty = taskQueue.isEmpty(); | 
 |  |  |         taskQueue.offer(message); | 
 |  |  |         if (!taskQueueHandlerRun) { | 
 |  |  |             taskQueueHandlerRun = true; | 
 |  |  |         if (isEmpty) { | 
 |  |  |             taskExecutor.execute(() -> { | 
 |  |  |                 while (!taskQueue.isEmpty()) { | 
 |  |  |                     Message msg = taskQueue.poll(); | 
 |  |  |                     MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | 
 |  |  |                     if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | 
 |  |  |                         logger.info("[REDIS消息-请求推流结果]:参数不全"); | 
 |  |  |                         return; | 
 |  |  |                     } | 
 |  |  |                     // 查看正在等待的invite消息 | 
 |  |  |                     if (responseEvents.get(response.getApp() + response.getStream()) != null) { | 
 |  |  |                         responseEvents.get(response.getApp() + response.getStream()).run(response); | 
 |  |  |                     try { | 
 |  |  |                         MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | 
 |  |  |                         if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | 
 |  |  |                             logger.info("[REDIS消息-请求推流结果]:参数不全"); | 
 |  |  |                             continue; | 
 |  |  |                         } | 
 |  |  |                         // 查看正在等待的invite消息 | 
 |  |  |                         if (responseEvents.get(response.getApp() + response.getStream()) != null) { | 
 |  |  |                             responseEvents.get(response.getApp() + response.getStream()).run(response); | 
 |  |  |                         } | 
 |  |  |                     }catch (Exception e) { | 
 |  |  |                         logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); | 
 |  |  |                         logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); | 
 |  |  |                     } | 
 |  |  |                 } | 
 |  |  |                 taskQueueHandlerRun = false; | 
 |  |  |             }); | 
 |  |  |         } | 
 |  |  |     } |