648540858
2022-03-29 1553b39b4547418774ab2bd6da72f75bfd14b972
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.storager.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -13,6 +14,9 @@
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.utils.node.ForestNodeMerger;
import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@@ -38,7 +42,7 @@
@Component
public class VideoManagerStoragerImpl implements IVideoManagerStorager {
   private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class);
   private final Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class);
   @Autowired
   EventPublisher eventPublisher;
@@ -137,6 +141,7 @@
      String now = this.format.format(System.currentTimeMillis());
      device.setUpdateTime(now);
      Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
      device.setCharset(device.getCharset().toUpperCase());
      if (deviceByDeviceId == null) {
         device.setCreateTime(now);
         redisCatchStorage.updateDevice(device);
@@ -154,7 +159,10 @@
   public synchronized void updateChannel(String deviceId, DeviceChannel channel) {
      String channelId = channel.getChannelId();
      channel.setDeviceId(deviceId);
      channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
      StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
      if (streamInfo != null) {
         channel.setStreamId(streamInfo.getStream());
      }
      String now = this.format.format(System.currentTimeMillis());
      channel.setUpdateTime(now);
      DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
@@ -164,6 +172,7 @@
      }else {
         deviceChannelMapper.update(channel);
      }
      deviceChannelMapper.updateChannelSubCount(deviceId,channel.getParentId());
   }
   @Override
@@ -172,11 +181,14 @@
      List<DeviceChannel> updateChannels = new ArrayList<>();
      HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
      if (channels != null && channels.size() > 0) {
         List<DeviceChannel> channelList = deviceChannelMapper.queryChannelsByDeviceId(deviceId);
         List<DeviceChannel> channelList = deviceChannelMapper.queryChannels(deviceId, null, null, null, null);
         if (channelList.size() == 0) {
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = this.format.format(System.currentTimeMillis());
               channel.setUpdateTime(now);
               channel.setCreateTime(now);
@@ -187,9 +199,11 @@
               channelsInStore.put(deviceChannel.getChannelId(), deviceChannel);
            }
            for (DeviceChannel channel : channels) {
               String channelId = channel.getChannelId();
               channel.setDeviceId(deviceId);
               channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId()));
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = this.format.format(System.currentTimeMillis());
               channel.setUpdateTime(now);
               if (channelsInStore.get(channel.getChannelId()) != null) {
@@ -237,6 +251,7 @@
      // 数据去重
      List<DeviceChannel> channels = new ArrayList<>();
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      if (deviceChannelList.size() > 1) {
         // 数据去重
         Set<String> gbIdSet = new HashSet<>();
@@ -244,10 +259,26 @@
            if (!gbIdSet.contains(deviceChannel.getChannelId())) {
               gbIdSet.add(deviceChannel.getChannelId());
               channels.add(deviceChannel);
               if (!StringUtils.isEmpty(deviceChannel.getParentId())) {
                  if (subContMap.get(deviceChannel.getParentId()) == null) {
                     subContMap.put(deviceChannel.getParentId(), 1);
                  }else {
                     Integer count = subContMap.get(deviceChannel.getParentId());
                     subContMap.put(deviceChannel.getParentId(), count++);
                  }
               }
            }else {
               stringBuilder.append(deviceChannel.getChannelId() + ",");
            }
         }
         if (channels.size() > 0) {
            for (DeviceChannel channel : channels) {
               if (subContMap.get(channel.getChannelId()) != null){
                  channel.setSubCount(subContMap.get(channel.getChannelId()));
               }
            }
         }
      }else {
         channels = deviceChannelList;
      }
@@ -255,7 +286,8 @@
         logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder);
      }
      try {
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
//         int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
         int limitCount = 300;
         boolean result = cleanChannelsResult < 0;
         if (!result && channels.size() > 0) {
@@ -326,6 +358,11 @@
   @Override
   public List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String query, Boolean hasSubChannel, Boolean online, int start, int limit) {
      return deviceChannelMapper.queryChannelsByDeviceIdWithStartAndLimit(deviceId, null, query, hasSubChannel, online, start, limit);
   }
   @Override
   public List<DeviceChannelTree> tree(String deviceId) {
      return ForestNodeMerger.merge(deviceChannelMapper.tree(deviceId));
   }
   @Override
@@ -507,7 +544,7 @@
         if (parentPlatformCatch == null) { // serverGBId 已变化
            ParentPlatform parentPlatById = platformMapper.getParentPlatById(parentPlatform.getId());
            // 使用旧的查出缓存ID
            parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatById.getServerGBId());
            parentPlatformCatch = new ParentPlatformCatch();
            parentPlatformCatch.setId(parentPlatform.getServerGBId());
            redisCatchStorage.delPlatformCatchInfo(parentPlatById.getServerGBId());
         }
@@ -568,34 +605,34 @@
   @Override
   public PageInfo<ChannelReduce> queryAllChannelList(int page, int count, String query, Boolean online,
                                          Boolean channelType, String platformId, Boolean inPlatform) {
                                          Boolean channelType, String platformId, String catalogId) {
      PageHelper.startPage(page, count);
      List<ChannelReduce> all = deviceChannelMapper.queryChannelListInAll(query, online, channelType, platformId, inPlatform);
      List<ChannelReduce> all = deviceChannelMapper.queryChannelListInAll(query, online, channelType, platformId, catalogId);
      return new PageInfo<>(all);
   }
   @Override
   public List<ChannelReduce> queryChannelListInParentPlatform(String platformId) {
   public List<DeviceChannelInPlatform> queryChannelListInParentPlatform(String platformId) {
      return deviceChannelMapper.queryChannelListInAll(null, null, null, platformId, true);
      return deviceChannelMapper.queryChannelByPlatformId(platformId);
   }
   @Override
   public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
      Map<String, ChannelReduce> deviceAndChannels = new HashMap<>();
      Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
      for (ChannelReduce channelReduce : channelReduces) {
         channelReduce.setCatalogId(catalogId);
         deviceAndChannels.put(channelReduce.getDeviceId() + "_" + channelReduce.getChannelId(), channelReduce);
         deviceAndChannels.put(channelReduce.getId(), channelReduce);
      }
      List<String> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      // 查询当前已经存在的
      List<String> relatedPlatformchannels = platformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList);
      if (relatedPlatformchannels != null) {
         deviceAndChannelList.removeAll(relatedPlatformchannels);
      List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
      if (deviceAndChannelList != null) {
         deviceAndChannelList.removeAll(channelIds);
      }
      for (String relatedPlatformchannel : relatedPlatformchannels) {
         deviceAndChannels.remove(relatedPlatformchannel);
      for (Integer channelId : channelIds) {
         deviceAndChannels.remove(channelId);
      }
      List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
      // 对剩下的数据进行存储
@@ -627,8 +664,16 @@
   @Override
   public DeviceChannel queryChannelInParentPlatform(String platformId, String channelId) {
      DeviceChannel channel = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
      return channel;
      List<DeviceChannel> channels = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
      if (channels.size() > 1) {
         // 出现长度大于0的时候肯定是国标通道的ID重复了
         logger.warn("国标ID存在重复:{}", channelId);
      }
      if (channels.size() == 0) {
         return null;
      }else {
         return channels.get(0);
      }
   }
   @Override
@@ -645,8 +690,18 @@
   @Override
   public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
      Device device = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
      return device;
      List<Device> devices = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
      if (devices.size() > 1) {
         // 出现长度大于0的时候肯定是国标通道的ID重复了
         logger.warn("国标ID存在重复:{}", channelId);
      }
      if (devices.size() == 0) {
         return null;
      }else {
         return devices.get(0);
      }
   }
   /**
@@ -681,9 +736,18 @@
      streamProxyItem.setCreateTime(now);
      streamProxyItem.setCreateStamp(System.currentTimeMillis());
      try {
         if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) {
         if (streamProxyMapper.add(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.add(streamProxyItem) < 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         }else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
@@ -707,10 +771,20 @@
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      try {
         if (gbStreamMapper.update(streamProxyItem)<0 || streamProxyMapper.update(streamProxyItem) < 0) {
         if (streamProxyMapper.update(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         } else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         result = true;
      }catch (Exception e) {
@@ -812,12 +886,11 @@
         List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
         if (parentPlatforms.size() > 0) {
            for (ParentPlatform parentPlatform : parentPlatforms) {
               streamPushItem.setCatalogId(parentPlatform.getCatalogId());
               streamPushItem.setPlatformId(parentPlatform.getServerGBId());
               String stream = streamPushItem.getStream();
               StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(streamPushItem.getApp(), stream,
               StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(streamPushItem.getApp(), streamPushItem.getStream(),
                     parentPlatform.getServerGBId());
               if (streamProxyItems == null) {
               if (streamProxyItem == null) {
                  streamPushItem.setCatalogId(parentPlatform.getCatalogId());
                  streamPushItem.setPlatformId(parentPlatform.getServerGBId());
                  platformGbStreamMapper.add(streamPushItem);
                  eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), streamPushItem, CatalogEvent.ADD);
               }
@@ -845,18 +918,6 @@
   @Override
   public void updateParentPlatformStatus(String platformGbID, boolean online) {
      platformMapper.updateParentPlatformStatus(platformGbID, online);
   }
   @Override
   public void updateMediaServer(MediaServerItem mediaServerItem) {
      String now = this.format.format(System.currentTimeMillis());
      mediaServerItem.setUpdateTime(now);
      if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) {
         mediaServerMapper.update(mediaServerItem);
      }else {
         mediaServerItem.setCreateTime(now);
         mediaServerMapper.add(mediaServerItem);
      }
   }
   @Override
@@ -1022,7 +1083,7 @@
      deviceChannel.setParentId(catalog.getParentId());
      deviceChannel.setRegisterWay(1);
      // 行政区划应该是Domain的前八位
      deviceChannel.setCivilCode(sipConfig.getDomain().substring(0, sipConfig.getDomain().length() - 2));
      deviceChannel.setCivilCode(parentPlatByServerGBId.getDeviceGBId().substring(0,6));
      deviceChannel.setModel("live");
      deviceChannel.setOwner("wvp-pro");
      deviceChannel.setSecrecy("0");
@@ -1041,6 +1102,9 @@
   @Override
   public List<ParentPlatform> queryPlatFormListForStreamWithGBId(String app, String stream, List<String> platforms) {
      if (platforms == null || platforms.size() == 0) {
         return new ArrayList<>();
      }
      return platformGbStreamMapper.queryPlatFormListForGBWithGBId(app, stream, platforms);
   }
@@ -1064,4 +1128,9 @@
   public PlatformCatalog queryDefaultCatalogInPlatform(String platformId) {
      return catalogMapper.selectDefaultByPlatFormId(platformId);
   }
   @Override
   public List<ChannelSourceInfo> getChannelSource(String platformId, String gbId) {
      return platformMapper.getChannelSource(platformId, gbId);
   }
}