648540858
2023-03-21 82adc0cb23f3ee47322e78889cdaba57e9309000
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,8 +25,6 @@
    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -43,24 +40,27 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                    if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                        logger.info("[REDIS消息-请求推流结果]:参数不全");
                        continue;
                    }
                    // 查看正在等待的invite消息
                    if (responseEvents.get(response.getApp() + response.getStream()) != null) {
                        responseEvents.get(response.getApp() + response.getStream()).run(response);
                    try {
                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
                            logger.info("[REDIS消息-请求推流结果]:参数不全");
                            continue;
                        }
                        // 查看正在等待的invite消息
                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
                            responseEvents.get(response.getApp() + response.getStream()).run(response);
                        }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }