648540858
2023-03-21 82adc0cb23f3ee47322e78889cdaba57e9309000
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -18,6 +18,7 @@
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.validation.constraints.NotNull;
import java.text.ParseException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -37,8 +38,6 @@
    @Autowired
    private IVideoManagerStorage storage;
    private boolean taskQueueHandlerRun = false;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
@@ -48,69 +47,68 @@
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
            taskQueueHandlerRun = true;
        if (isEmpty) {
            logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
                        if (alarmChannelMessage == null) {
                            logger.warn("[REDIS的ALARM通知]消息解析失败");
                            continue;
                        }
                        String gbId = alarmChannelMessage.getGbId();
                    AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
                    if (alarmChannelMessage == null) {
                        logger.warn("[REDIS的ALARM通知]消息解析失败");
                        continue;
                    }
                    String gbId = alarmChannelMessage.getGbId();
                        DeviceAlarm deviceAlarm = new DeviceAlarm();
                        deviceAlarm.setCreateTime(DateUtil.getNow());
                        deviceAlarm.setChannelId(gbId);
                        deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
                        deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
                        deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType());
                        deviceAlarm.setAlarmPriority("1");
                        deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
                        deviceAlarm.setLongitude(0);
                        deviceAlarm.setLatitude(0);
                    DeviceAlarm deviceAlarm = new DeviceAlarm();
                    deviceAlarm.setCreateTime(DateUtil.getNow());
                    deviceAlarm.setChannelId(gbId);
                    deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
                    deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
                    deviceAlarm.setAlarmPriority("1");
                    deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
                    deviceAlarm.setAlarmType("1");
                    deviceAlarm.setLongitude(0);
                    deviceAlarm.setLatitude(0);
                    if (ObjectUtils.isEmpty(gbId)) {
                        // 发送给所有的上级
                        List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
                        if (parentPlatforms.size() > 0) {
                            for (ParentPlatform parentPlatform : parentPlatforms) {
                                try {
                                    commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
                        if (ObjectUtils.isEmpty(gbId)) {
                            // 发送给所有的上级
                            List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
                            if (parentPlatforms.size() > 0) {
                                for (ParentPlatform parentPlatform : parentPlatforms) {
                                    try {
                                        commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
                                    }
                                }
                            }
                        }
                    }else {
                        Device device = storage.queryVideoDevice(gbId);
                        ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
                        if (device != null && platform == null) {
                            try {
                                commander.sendAlarmMessage(device, deviceAlarm);
                            } catch (InvalidArgumentException | SipException | ParseException e) {
                                logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
                            }
                        }else if (device == null && platform != null){
                            try {
                                commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
                            } catch (InvalidArgumentException | SipException | ParseException e) {
                                logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
                            }
                        }else {
                            logger.warn("无法确定" + gbId + "是平台还是设备");
                            Device device = storage.queryVideoDevice(gbId);
                            ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
                            if (device != null && platform == null) {
                                try {
                                    commander.sendAlarmMessage(device, deviceAlarm);
                                } catch (InvalidArgumentException | SipException | ParseException e) {
                                    logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
                                }
                            }else if (device == null && platform != null){
                                try {
                                    commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
                                } catch (InvalidArgumentException | SipException | ParseException e) {
                                    logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
                                }
                            }else {
                                logger.warn("无法确定" + gbId + "是平台还是设备");
                            }
                        }
                    }catch (Exception e) {
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                    }
                }
                taskQueueHandlerRun = false;
            });
        }
    }
}