648540858
2024-04-24 44aa37ad6eb9f979ff58a0cabc7cd73c2d4ac133
优化notify消息中目录的处理
1个文件已修改
154 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 154 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -22,6 +23,7 @@
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,11 +39,12 @@
    private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
    private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
    private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
    private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
    private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
    private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
    @Autowired
    private UserSetting userSetting;
@@ -54,6 +57,9 @@
    @Autowired
    private IDeviceChannelService deviceChannelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private SipConfig sipConfig;
@@ -106,48 +112,100 @@
                            channel.setParentId(null);
                        }
                        channel.setDeviceId(device.getDeviceId());
                        logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId());
                        logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                        switch (event) {
                            case CatalogEvent.ON:
                                // 上线
                                deviceChannelService.online(channel);
                                logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                updateChannelOnlineList.add(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                                }
                                break;
                            case CatalogEvent.OFF :
                            case CatalogEvent.VLOST:
                            case CatalogEvent.DEFECT:
                                // 离线
                                logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                    logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                }else {
                                    deviceChannelService.offline(channel);
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.VLOST:
                                // 视频丢失
                                logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                } else {
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.DEFECT:
                                // 故障
                                logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                                    logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                } else {
                                    updateChannelOfflineList.add(channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                                    }
                                }
                                break;
                            case CatalogEvent.ADD:
                                // 增加
                                logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                // 判断此通道是否存在
                                DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                                if (deviceChannel != null) {
                                    logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                    channel.setId(deviceChannel.getId());
                                    channel.setHasAudio(null);
                                    updateChannelMap.put(channel.getChannelId(), channel);
                                } else {
                                    addChannelMap.put(channel.getChannelId(), channel);
                                    if (userSetting.getDeviceStatusNotify()) {
                                        // 发送redis消息
                                        redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                    }
                                }
                                break;
                            case CatalogEvent.DEL:
                                // 删除
                                deviceChannelService.delete(channel);
                                logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                deleteChannelList.add(channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                                }
                                break;
                            case CatalogEvent.ADD:
                            case CatalogEvent.UPDATE:
                                // 更新
                                logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                                // 判断此通道是否存在
                                DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                                if (deviceChannelForUpdate != null) {
                                    channel.setId(deviceChannelForUpdate.getId());
                                channel.setUpdateTime(DateUtil.getNow());
                                channel.setHasAudio(null);
                                deviceChannelService.updateChannel(deviceId,channel);
                                    updateChannelMap.put(channel.getChannelId(), channel);
                                } else {
                                    addChannelMap.put(channel.getChannelId(), channel);
                                if (userSetting.getDeviceStatusNotify()) {
                                    // 发送redis消息
                                    redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                                    }
                                }
                                break;
                            default:
@@ -158,9 +216,83 @@
                        eventPublisher.catalogEventPublish(null, channel, event);
                    }
                }
            } catch (DocumentException e) {
                logger.error("未处理的异常 ", e);
            }
        }
        if (!updateChannelMap.keySet().isEmpty()
                || !addChannelMap.keySet().isEmpty()
                || !updateChannelOnlineList.isEmpty()
                || !updateChannelOfflineList.isEmpty()
                || !deleteChannelList.isEmpty()) {
            executeSave();
        }
    }
    public void executeSave(){
        try {
            executeSaveForAdd();
        } catch (Exception e) {
            logger.error("[存储收到的增加通道] 异常: ", e );
        }
        try {
            executeSaveForOnline();
        } catch (Exception e) {
            logger.error("[存储收到的通道上线] 异常: ", e );
        }
        try {
            executeSaveForOffline();
        } catch (Exception e) {
            logger.error("[存储收到的通道离线] 异常: ", e );
        }
        try {
            executeSaveForUpdate();
        } catch (Exception e) {
            logger.error("[存储收到的更新通道] 异常: ", e );
        }
        try {
            executeSaveForDelete();
        } catch (Exception e) {
            logger.error("[存储收到的删除通道] 异常: ", e );
        }
    }
    private void executeSaveForUpdate(){
        if (!updateChannelMap.values().isEmpty()) {
            logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
            deviceChannelService.batchUpdateChannel(deviceChannels);
            updateChannelMap.clear();
        }
    }
    private void executeSaveForAdd(){
        if (!addChannelMap.values().isEmpty()) {
            ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
            addChannelMap.clear();
            deviceChannelService.batchAddChannel(deviceChannels);
        }
    }
    private void executeSaveForDelete(){
        if (!deleteChannelList.isEmpty()) {
            deviceChannelService.deleteChannels(deleteChannelList);
            deleteChannelList.clear();
        }
    }
    private void executeSaveForOnline(){
        if (!updateChannelOnlineList.isEmpty()) {
            deviceChannelService.channelsOnline(updateChannelOnlineList);
            updateChannelOnlineList.clear();
        }
    }
    private void executeSaveForOffline(){
        if (!updateChannelOfflineList.isEmpty()) {
            deviceChannelService.channelsOffline(updateChannelOfflineList);
            updateChannelOfflineList.clear();
        }
    }
}