648540858
2022-11-29 ef6693aabbbf12e83d09ad8749f6e60faacc012d
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
@@ -6,7 +6,6 @@
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,7 +17,8 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -38,7 +38,6 @@
    @Resource
    private IGbStreamService gbStreamService;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@@ -49,13 +48,13 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                    List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
                    //查询全部的app+stream 用于判断是添加还是修改
                    List<String> allAppAndStream = streamPushService.getAllAppAndStream();
@@ -95,8 +94,10 @@
                        logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
                        gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
                    }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                }
                taskQueueHandlerRun = false;
                }
            });
        }
    }