648540858
2023-04-04 b4048fbe80dba8e7756ae557a15ab60b4f80a44b
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -44,13 +45,17 @@
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private UserSetting userSetting;
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        // 消息示例:  PUBLISH alarm_receive '{ "gbId": "", "alarmSn": 1, "alarmType": "111", "alarmDescription": "222", }'
        logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
//            logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
@@ -69,22 +74,52 @@
                        deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
                        deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType());
                        deviceAlarm.setAlarmPriority("1");
                        deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
                        deviceAlarm.setAlarmTime(DateUtil.getNow());
                        deviceAlarm.setLongitude(0);
                        deviceAlarm.setLatitude(0);
                        if (ObjectUtils.isEmpty(gbId)) {
                            // 发送给所有的上级
                            List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
                            if (parentPlatforms.size() > 0) {
                                for (ParentPlatform parentPlatform : parentPlatforms) {
                            if (userSetting.getSendToPlatformsWhenIdLost()) {
                                // 发送给所有的上级
                                List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
                                if (parentPlatforms.size() > 0) {
                                    for (ParentPlatform parentPlatform : parentPlatforms) {
                                        try {
                                            deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
                                            commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
                                        } catch (SipException | InvalidArgumentException | ParseException e) {
                                            logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
                                        }
                                    }
                                }
                            }else {
                                // 获取开启了消息推送的设备和平台
                                List<ParentPlatform> parentPlatforms = storage.queryEnablePlatformListWithAsMessageChannel();
                                if (parentPlatforms.size() > 0) {
                                    for (ParentPlatform parentPlatform : parentPlatforms) {
                                        try {
                                            deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
                                            commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
                                        } catch (SipException | InvalidArgumentException | ParseException e) {
                                            logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
                                        }
                                    }
                                }
                            }
                            // 获取开启了消息推送的设备和平台
                            List<Device> devices = storage.queryDeviceWithAsMessageChannel();
                            if (devices.size() > 0) {
                                for (Device device : devices) {
                                    try {
                                        commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
                                        deviceAlarm.setChannelId(device.getDeviceId());
                                        commander.sendAlarmMessage(device, deviceAlarm);
                                    } catch (InvalidArgumentException | SipException | ParseException e) {
                                        logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
                                    }
                                }
                            }
                        }else {
                            Device device = storage.queryVideoDevice(gbId);
                            ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
@@ -105,6 +140,7 @@
                            }
                        }
                    }catch (Exception e) {
                        logger.error("未处理的异常 ", e);
                        logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
                    }
                }