|  |  |  | 
|---|
|  |  |  | package com.genersoft.iot.vmp.service.redisMsg; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSON; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private boolean taskQueueHandlerRun = false; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Qualifier("taskExecutor") | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message message, byte[] bytes) { | 
|---|
|  |  |  | logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | 
|---|
|  |  |  | logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | 
|---|
|  |  |  | boolean isEmpty = taskQueue.isEmpty(); | 
|---|
|  |  |  | taskQueue.offer(message); | 
|---|
|  |  |  | if (!taskQueueHandlerRun) { | 
|---|
|  |  |  | taskQueueHandlerRun = true; | 
|---|
|  |  |  | if (isEmpty) { | 
|---|
|  |  |  | taskExecutor.execute(() -> { | 
|---|
|  |  |  | while (!taskQueue.isEmpty()) { | 
|---|
|  |  |  | Message msg = taskQueue.poll(); | 
|---|
|  |  |  | MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | 
|---|
|  |  |  | if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | 
|---|
|  |  |  | logger.info("[REDIS消息-请求推流结果]:参数不全"); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 查看正在等待的invite消息 | 
|---|
|  |  |  | if (responseEvents.get(response.getApp() + response.getStream()) != null) { | 
|---|
|  |  |  | responseEvents.get(response.getApp() + response.getStream()).run(response); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | 
|---|
|  |  |  | if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | 
|---|
|  |  |  | logger.info("[REDIS消息-请求推流结果]:参数不全"); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | // 查看正在等待的invite消息 | 
|---|
|  |  |  | if (responseEvents.get(response.getApp() + response.getStream()) != null) { | 
|---|
|  |  |  | responseEvents.get(response.getApp() + response.getStream()).run(response); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | taskQueueHandlerRun = false; | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|