648540858
2024-06-13 4ad711f61a1a760a0f8f7f8475b75ec93e31d8ae
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
@@ -1,11 +1,11 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -19,7 +19,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
@@ -28,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -46,6 +49,7 @@
   private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
   private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
   private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
   @Autowired
   private UserSetting userSetting;
@@ -63,222 +67,193 @@
   private DynamicTask dynamicTask;
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   @Autowired
   private SipConfig sipConfig;
   private final static String talkKey = "notify-request-for-catalog-task";
   @Transactional
   public void process(RequestEvent evt) {
      try {
         long start = System.currentTimeMillis();
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
      if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
         logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
         return;
      }
      taskQueue.offer(new HandlerCatchData(evt, null, null));
   }
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device == null || !device.isOnLine()) {
            logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
            return;
   @Scheduled(fixedRate = 400)   //每400毫秒执行一次
   public void executeTaskQueue(){
      if (taskQueue.isEmpty()) {
         return;
      }
      for (HandlerCatchData take : taskQueue) {
         if (take == null) {
            continue;
         }
         Element rootElement = getRootElement(evt, device.getCharset());
         if (rootElement == null) {
            logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
            return;
         }
         Element deviceListElement = rootElement.element("DeviceList");
         if (deviceListElement == null) {
            return;
         }
         Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
         if (deviceListIterator != null) {
         RequestEvent evt = take.getEvt();
         try {
            long start = System.currentTimeMillis();
            FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
            String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
            // 遍历DeviceList
            while (deviceListIterator.hasNext()) {
               Element itemDevice = deviceListIterator.next();
               Element channelDeviceElement = itemDevice.element("DeviceID");
               if (channelDeviceElement == null) {
                  continue;
               }
               Element eventElement = itemDevice.element("Event");
               String event;
               if (eventElement == null) {
                  logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
                  event = CatalogEvent.ADD;
               }else {
                  event = eventElement.getText().toUpperCase();
               }
               DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
               if (channel == null) {
                  logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                  continue;
               }
               if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                  channel.setParentId(null);
               }
               channel.setDeviceId(device.getDeviceId());
               logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
               switch (event) {
                  case CatalogEvent.ON:
                     // 上线
                     logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     updateChannelOnlineList.add(channel);
                     if (updateChannelOnlineList.size() > 300) {
                        executeSaveForOnline();
                     }
                     if (userSetting.getDeviceStatusNotify()) {
                        // 发送redis消息
                        redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                     }
            Device device = redisCatchStorage.getDevice(deviceId);
            if (device == null || !device.isOnLine()) {
               logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : ""));
               return;
            }
            Element rootElement = getRootElement(evt, device.getCharset());
            if (rootElement == null) {
               logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
               return;
            }
            Element deviceListElement = rootElement.element("DeviceList");
            if (deviceListElement == null) {
               return;
            }
            Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
            if (deviceListIterator != null) {
                     break;
                  case CatalogEvent.OFF :
                     // 离线
                     logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }else {
                        updateChannelOfflineList.add(channel);
                        if (updateChannelOfflineList.size() > 300) {
                           executeSaveForOffline();
                        }
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                        }
                     }
                     break;
                  case CatalogEvent.VLOST:
                     // 视频丢失
                     logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }else {
                        updateChannelOfflineList.add(channel);
                        if (updateChannelOfflineList.size() > 300) {
                           executeSaveForOffline();
                        }
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                        }
                     }
                     break;
                  case CatalogEvent.DEFECT:
                     // 故障
                     logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                        logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     }else {
                        updateChannelOfflineList.add(channel);
                        if (updateChannelOfflineList.size() > 300) {
                           executeSaveForOffline();
                        }
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                        }
                     }
                     break;
                  case CatalogEvent.ADD:
                     // 增加
                     logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     // 判断此通道是否存在
                     DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                     if (deviceChannel != null) {
                        logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        channel.setId(deviceChannel.getId());
                        updateChannelMap.put(channel.getChannelId(), channel);
                        if (updateChannelMap.keySet().size() > 300) {
                           executeSaveForUpdate();
                        }
                     }else {
                        addChannelMap.put(channel.getChannelId(), channel);
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                        }
                        if (addChannelMap.keySet().size() > 300) {
                           executeSaveForAdd();
                        }
                     }
                     break;
                  case CatalogEvent.DEL:
                     // 删除
                     logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     deleteChannelList.add(channel);
                     if (userSetting.getDeviceStatusNotify()) {
                        // 发送redis消息
                        redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                     }
                     if (deleteChannelList.size() > 300) {
                        executeSaveForDelete();
                     }
                     break;
                  case CatalogEvent.UPDATE:
                     // 更新
                     logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     // 判断此通道是否存在
                     DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                     if (deviceChannelForUpdate != null) {
                        channel.setId(deviceChannelForUpdate.getId());
                        channel.setUpdateTime(DateUtil.getNow());
                        updateChannelMap.put(channel.getChannelId(), channel);
                        if (updateChannelMap.keySet().size() > 300) {
                           executeSaveForUpdate();
                        }
                     }else {
                        addChannelMap.put(channel.getChannelId(), channel);
                        if (addChannelMap.keySet().size() > 300) {
                           executeSaveForAdd();
                        }
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                        }
                     }
                     break;
                  default:
                     logger.warn("[ NotifyCatalog ] event not found : {}", event );
               }
               // 转发变化信息
               eventPublisher.catalogEventPublish(null, channel, event);
               if (!updateChannelMap.keySet().isEmpty()
                     || !addChannelMap.keySet().isEmpty()
                     || !updateChannelOnlineList.isEmpty()
                     || !updateChannelOfflineList.isEmpty()
                     || !deleteChannelList.isEmpty()) {
                  if (!dynamicTask.contains(talkKey)) {
                     dynamicTask.startDelay(talkKey, this::executeSave, 1000);
               // 遍历DeviceList
               while (deviceListIterator.hasNext()) {
                  Element itemDevice = deviceListIterator.next();
                  Element eventElement = itemDevice.element("Event");
                  String event;
                  if (eventElement == null) {
                     logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : ""));
                     event = CatalogEvent.ADD;
                  } else {
                     event = eventElement.getText().toUpperCase();
                  }
                  DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
                  if (channel == null) {
                     logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                     continue;
                  }
                  if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                     channel.setParentId(null);
                  }
                  channel.setDeviceId(device.getDeviceId());
                  logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
                  switch (event) {
                     case CatalogEvent.ON:
                        // 上线
                        logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        updateChannelOnlineList.add(channel);
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
                        }
                        break;
                     case CatalogEvent.OFF:
                        // 离线
                        logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                           logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        } else {
                           updateChannelOfflineList.add(channel);
                           if (userSetting.getDeviceStatusNotify()) {
                              // 发送redis消息
                              redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                           }
                        }
                        break;
                     case CatalogEvent.VLOST:
                        // 视频丢失
                        logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                           logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        } else {
                           updateChannelOfflineList.add(channel);
                           if (userSetting.getDeviceStatusNotify()) {
                              // 发送redis消息
                              redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                           }
                        }
                        break;
                     case CatalogEvent.DEFECT:
                        // 故障
                        logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
                           logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        } else {
                           updateChannelOfflineList.add(channel);
                           if (userSetting.getDeviceStatusNotify()) {
                              // 发送redis消息
                              redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
                           }
                        }
                        break;
                     case CatalogEvent.ADD:
                        // 增加
                        logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        // 判断此通道是否存在
                        DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                        if (deviceChannel != null) {
                           logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                           channel.setId(deviceChannel.getId());
                           channel.setHasAudio(null);
                           updateChannelMap.put(channel.getChannelId(), channel);
                        } else {
                           addChannelMap.put(channel.getChannelId(), channel);
                           if (userSetting.getDeviceStatusNotify()) {
                              // 发送redis消息
                              redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                           }
                        }
                        break;
                     case CatalogEvent.DEL:
                        // 删除
                        logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        deleteChannelList.add(channel);
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                        }
                        break;
                     case CatalogEvent.UPDATE:
                        // 更新
                        logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        // 判断此通道是否存在
                        DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                        if (deviceChannelForUpdate != null) {
                           channel.setId(deviceChannelForUpdate.getId());
                           channel.setUpdateTime(DateUtil.getNow());
                           channel.setHasAudio(null);
                           updateChannelMap.put(channel.getChannelId(), channel);
                        } else {
                           addChannelMap.put(channel.getChannelId(), channel);
                           if (userSetting.getDeviceStatusNotify()) {
                              // 发送redis消息
                              redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                           }
                        }
                        break;
                     default:
                        logger.warn("[ NotifyCatalog ] event not found : {}", event);
                  }
                  // 转发变化信息
                  eventPublisher.catalogEventPublish(null, channel, event);
               }
            }
         } catch (DocumentException e) {
            logger.error("未处理的异常 ", e);
         }
      } catch (DocumentException e) {
         logger.error("未处理的异常 ", e);
      }
      taskQueue.clear();
      if (!updateChannelMap.keySet().isEmpty()
            || !addChannelMap.keySet().isEmpty()
            || !updateChannelOnlineList.isEmpty()
            || !updateChannelOfflineList.isEmpty()
            || !deleteChannelList.isEmpty()) {
         executeSave();
      }
   }
   private void executeSave(){
   public void executeSave(){
      try {
         executeSaveForAdd();
      } catch (Exception e) {
         logger.error("[存储收到的增加通道] 异常: ", e );
      }
      try {
         executeSaveForUpdate();
      } catch (Exception e) {
         logger.error("[存储收到的更新通道] 异常: ", e );
      }
      try {
         executeSaveForDelete();
      } catch (Exception e) {
         logger.error("[存储收到的删除通道] 异常: ", e );
      }
      try {
         executeSaveForOnline();
@@ -290,16 +265,25 @@
      } catch (Exception e) {
         logger.error("[存储收到的通道离线] 异常: ", e );
      }
      dynamicTask.stop(talkKey);
      try {
         executeSaveForUpdate();
      } catch (Exception e) {
         logger.error("[存储收到的更新通道] 异常: ", e );
      }
      try {
         executeSaveForDelete();
      } catch (Exception e) {
         logger.error("[存储收到的删除通道] 异常: ", e );
      }
   }
   private void executeSaveForUpdate(){
      if (!updateChannelMap.values().isEmpty()) {
         logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
         updateChannelMap.clear();
         deviceChannelService.batchUpdateChannel(deviceChannels);
         updateChannelMap.clear();
      }
   }
   private void executeSaveForAdd(){
@@ -331,4 +315,8 @@
      }
   }
//   @Scheduled(fixedRate = 10000)   //每1秒执行一次
//   public void execute(){
//      logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size());
//   }
}