Fang
2022-03-07 bea63f67e75ea6c38d946c2ee463260fcf815f87
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.storager.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -13,6 +14,7 @@
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.utils.node.ForestNodeMerger;
import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -156,7 +158,10 @@
   public synchronized void updateChannel(String deviceId, DeviceChannel channel) {
      String channelId = channel.getChannelId();
      channel.setDeviceId(deviceId);
      channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
      StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
      if (streamInfo != null) {
         channel.setStreamId(streamInfo.getStream());
      }
      String now = this.format.format(System.currentTimeMillis());
      channel.setUpdateTime(now);
      DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
@@ -178,7 +183,10 @@
         if (channelList.size() == 0) {
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = this.format.format(System.currentTimeMillis());
               channel.setUpdateTime(now);
               channel.setCreateTime(now);
@@ -189,9 +197,11 @@
               channelsInStore.put(deviceChannel.getChannelId(), deviceChannel);
            }
            for (DeviceChannel channel : channels) {
               String channelId = channel.getChannelId();
               channel.setDeviceId(deviceId);
               channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = this.format.format(System.currentTimeMillis());
               channel.setUpdateTime(now);
               if (channelsInStore.get(channel.getChannelId()) != null) {
@@ -274,7 +284,8 @@
         logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder);
      }
      try {
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
//         int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
         int limitCount = 300;
         boolean result = cleanChannelsResult < 0;
         if (!result && channels.size() > 0) {
@@ -607,19 +618,19 @@
   @Override
   public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
      Map<String, ChannelReduce> deviceAndChannels = new HashMap<>();
      Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
      for (ChannelReduce channelReduce : channelReduces) {
         channelReduce.setCatalogId(catalogId);
         deviceAndChannels.put(channelReduce.getDeviceId() + "_" + channelReduce.getChannelId(), channelReduce);
         deviceAndChannels.put(channelReduce.getId(), channelReduce);
      }
      List<String> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      // 查询当前已经存在的
      List<String> relatedPlatformchannels = platformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList);
      if (relatedPlatformchannels != null) {
         deviceAndChannelList.removeAll(relatedPlatformchannels);
      List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
      if (deviceAndChannelList != null) {
         deviceAndChannelList.removeAll(channelIds);
      }
      for (String relatedPlatformchannel : relatedPlatformchannels) {
         deviceAndChannels.remove(relatedPlatformchannel);
      for (Integer channelId : channelIds) {
         deviceAndChannels.remove(channelId);
      }
      List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
      // 对剩下的数据进行存储
@@ -705,9 +716,18 @@
      streamProxyItem.setCreateTime(now);
      streamProxyItem.setCreateStamp(System.currentTimeMillis());
      try {
         if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) {
         if (streamProxyMapper.add(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.add(streamProxyItem) < 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         }else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
@@ -731,10 +751,20 @@
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      try {
         if (gbStreamMapper.update(streamProxyItem)<0 || streamProxyMapper.update(streamProxyItem) < 0) {
         if (streamProxyMapper.update(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.update(streamProxyItem) > 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         } else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         result = true;
      }catch (Exception e) {
@@ -1076,4 +1106,9 @@
   public PlatformCatalog queryDefaultCatalogInPlatform(String platformId) {
      return catalogMapper.selectDefaultByPlatFormId(platformId);
   }
   @Override
   public List<ChannelSourceInfo> getChannelSource(String platformId, String gbId) {
      return platformMapper.getChannelSource(platformId, gbId);
   }
}