648540858
2024-04-03 5743917439f3989a4aa6748d8498b129e0521643
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
old mode 100644 new mode 100755
@@ -2,6 +2,7 @@
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@@ -12,6 +13,7 @@
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
@@ -63,6 +65,9 @@
   @Autowired
   private CivilCodeFileConf civilCodeFileConf;
   @Autowired
   private SipConfig sipConfig;
   private final static String talkKey = "notify-request-for-catalog-task";
   public void process(RequestEvent evt) {
@@ -72,7 +77,7 @@
         String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
         Device device = redisCatchStorage.getDevice(deviceId);
         if (device == null || device.getOnline() == 0) {
         if (device == null || !device.isOnLine()) {
            logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
            return;
         }
@@ -103,8 +108,14 @@
               }else {
                  event = eventElement.getText().toUpperCase();
               }
               DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
               DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
               if (channel == null) {
                  logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
                  continue;
               }
               if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
                  channel.setParentId(null);
               }
               channel.setDeviceId(device.getDeviceId());
               logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
               switch (event) {
@@ -175,6 +186,7 @@
                     // 判断此通道是否存在
                     DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
                     if (deviceChannel != null) {
                        logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                        channel.setId(deviceChannel.getId());
                        updateChannelMap.put(channel.getChannelId(), channel);
                        if (updateChannelMap.keySet().size() > 300) {
@@ -182,6 +194,11 @@
                        }
                     }else {
                        addChannelMap.put(channel.getChannelId(), channel);
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                        }
                        if (addChannelMap.keySet().size() > 300) {
                           executeSaveForAdd();
                        }
@@ -192,6 +209,10 @@
                     // 删除
                     logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
                     deleteChannelList.add(channel);
                     if (userSetting.getDeviceStatusNotify()) {
                        // 发送redis消息
                        redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
                     }
                     if (deleteChannelList.size() > 300) {
                        executeSaveForDelete();
                     }
@@ -203,6 +224,7 @@
                     DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
                     if (deviceChannelForUpdate != null) {
                        channel.setId(deviceChannelForUpdate.getId());
                        channel.setUpdateTime(DateUtil.getNow());
                        updateChannelMap.put(channel.getChannelId(), channel);
                        if (updateChannelMap.keySet().size() > 300) {
                           executeSaveForUpdate();
@@ -211,6 +233,10 @@
                        addChannelMap.put(channel.getChannelId(), channel);
                        if (addChannelMap.keySet().size() > 300) {
                           executeSaveForAdd();
                        }
                        if (userSetting.getDeviceStatusNotify()) {
                           // 发送redis消息
                           redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
                        }
                     }
                     break;
@@ -221,11 +247,11 @@
               // 转发变化信息
               eventPublisher.catalogEventPublish(null, channel, event);
               if (updateChannelMap.keySet().size() > 0
                     || addChannelMap.keySet().size() > 0
                     || updateChannelOnlineList.size() > 0
                     || updateChannelOfflineList.size() > 0
                     || deleteChannelList.size() > 0) {
               if (!updateChannelMap.keySet().isEmpty()
                     || !addChannelMap.keySet().isEmpty()
                     || !updateChannelOnlineList.isEmpty()
                     || !updateChannelOfflineList.isEmpty()
                     || !deleteChannelList.isEmpty()) {
                  if (!dynamicTask.contains(talkKey)) {
                     dynamicTask.startDelay(talkKey, this::executeSave, 1000);
@@ -239,16 +265,36 @@
   }
   private void executeSave(){
      System.out.println("定时存储数据");
      executeSaveForUpdate();
      executeSaveForDelete();
      executeSaveForOnline();
      executeSaveForOffline();
      try {
         executeSaveForAdd();
      } catch (Exception e) {
         logger.error("[存储收到的增加通道] 异常: ", e );
      }
      try {
         executeSaveForUpdate();
      } catch (Exception e) {
         logger.error("[存储收到的更新通道] 异常: ", e );
      }
      try {
         executeSaveForDelete();
      } catch (Exception e) {
         logger.error("[存储收到的删除通道] 异常: ", e );
      }
      try {
         executeSaveForOnline();
      } catch (Exception e) {
         logger.error("[存储收到的通道上线] 异常: ", e );
      }
      try {
         executeSaveForOffline();
      } catch (Exception e) {
         logger.error("[存储收到的通道离线] 异常: ", e );
      }
      dynamicTask.stop(talkKey);
   }
   private void executeSaveForUpdate(){
      if (updateChannelMap.values().size() > 0) {
      if (!updateChannelMap.values().isEmpty()) {
         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
         updateChannelMap.clear();
         deviceChannelService.batchUpdateChannel(deviceChannels);
@@ -257,7 +303,7 @@
   }
   private void executeSaveForAdd(){
      if (addChannelMap.values().size() > 0) {
      if (!addChannelMap.values().isEmpty()) {
         ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
         addChannelMap.clear();
         deviceChannelService.batchAddChannel(deviceChannels);
@@ -265,21 +311,21 @@
   }
   private void executeSaveForDelete(){
      if (deleteChannelList.size() > 0) {
      if (!deleteChannelList.isEmpty()) {
         deviceChannelService.deleteChannels(deleteChannelList);
         deleteChannelList.clear();
      }
   }
   private void executeSaveForOnline(){
      if (updateChannelOnlineList.size() > 0) {
      if (!updateChannelOnlineList.isEmpty()) {
         deviceChannelService.channelsOnline(updateChannelOnlineList);
         updateChannelOnlineList.clear();
      }
   }
   private void executeSaveForOffline(){
      if (updateChannelOfflineList.size() > 0) {
      if (!updateChannelOfflineList.isEmpty()) {
         deviceChannelService.channelsOffline(updateChannelOfflineList);
         updateChannelOfflineList.clear();
      }