zouyaoji
2022-08-06 3b5a37c270e5b6649b9a168ee753c2b3353a257a
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -25,11 +25,13 @@
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
/**
 * 视频设备数据存储-jdbc实现
 * swwheihei
 * 2020年5月6日 下午2:31:42
@@ -46,11 +48,12 @@
   @Autowired
   SipConfig sipConfig;
   @Autowired
   DataSourceTransactionManager dataSourceTransactionManager;
   @Autowired
   TransactionDefinition transactionDefinition;
   @Autowired
   DataSourceTransactionManager dataSourceTransactionManager;
   @Autowired
    private DeviceMapper deviceMapper;
@@ -102,149 +105,32 @@
      return deviceMapper.getDeviceByDeviceId(deviceId) != null;
   }
   /**
    * 视频设备创建
    *
    * @param device 设备对象
    * @return true:创建成功  false:创建失败
    */
   @Override
   public synchronized boolean create(Device device) {
      redisCatchStorage.updateDevice(device);
      return deviceMapper.add(device) > 0;
   }
   /**
    * 视频设备更新
    *
    * @param device 设备对象
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean updateDevice(Device device) {
      String now = DateUtil.getNow();
      device.setUpdateTime(now);
      Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
      device.setCharset(device.getCharset().toUpperCase());
      if (deviceByDeviceId == null) {
         device.setCreateTime(now);
         redisCatchStorage.updateDevice(device);
         return deviceMapper.add(device) > 0;
      }else {
         redisCatchStorage.updateDevice(device);
         return deviceMapper.update(device) > 0;
      }
   }
   @Override
   public synchronized void updateChannel(String deviceId, DeviceChannel channel) {
      String channelId = channel.getChannelId();
      channel.setDeviceId(deviceId);
      StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
      if (streamInfo != null) {
         channel.setStreamId(streamInfo.getStream());
      }
      String now = DateUtil.getNow();
      channel.setUpdateTime(now);
      DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
      if (deviceChannel == null) {
         channel.setCreateTime(now);
         deviceChannelMapper.add(channel);
      }else {
         deviceChannelMapper.update(channel);
      }
      deviceChannelMapper.updateChannelSubCount(deviceId,channel.getParentId());
   }
   @Override
   public int updateChannels(String deviceId, List<DeviceChannel> channels) {
      List<DeviceChannel> addChannels = new ArrayList<>();
      List<DeviceChannel> updateChannels = new ArrayList<>();
      HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
      if (channels != null && channels.size() > 0) {
         List<DeviceChannel> channelList = deviceChannelMapper.queryChannels(deviceId, null, null, null, null);
         if (channelList.size() == 0) {
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = DateUtil.getNow();
               channel.setUpdateTime(now);
               channel.setCreateTime(now);
               addChannels.add(channel);
            }
         }else {
            for (DeviceChannel deviceChannel : channelList) {
               channelsInStore.put(deviceChannel.getChannelId(), deviceChannel);
            }
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = DateUtil.getNow();
               channel.setUpdateTime(now);
               if (channelsInStore.get(channel.getChannelId()) != null) {
                  updateChannels.add(channel);
               }else {
                  addChannels.add(channel);
                  channel.setCreateTime(now);
               }
            }
         }
         int limitCount = 300;
         if (addChannels.size() > 0) {
            if (addChannels.size() > limitCount) {
               for (int i = 0; i < addChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > addChannels.size()) {
                     toIndex = addChannels.size();
                  }
                  deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex));
               }
            }else {
               deviceChannelMapper.batchAdd(addChannels);
            }
         }
         if (updateChannels.size() > 0) {
            if (updateChannels.size() > limitCount) {
               for (int i = 0; i < updateChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > updateChannels.size()) {
                     toIndex = updateChannels.size();
                  }
                  deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex));
               }
            }else {
               deviceChannelMapper.batchUpdate(updateChannels);
            }
         }
      }
      return addChannels.size() + updateChannels.size();
   }
   @Override
   public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
      if (deviceChannelList == null) {
      if (CollectionUtils.isEmpty(deviceChannelList)) {
         return false;
      }
      List<DeviceChannel> allChannelInPlay = deviceChannelMapper.getAllChannelInPlay();
      Map<String,DeviceChannel> allChannelMapInPlay = new ConcurrentHashMap<>();
      if (allChannelInPlay.size() > 0) {
         for (DeviceChannel deviceChannel : allChannelInPlay) {
            allChannelMapInPlay.put(deviceChannel.getChannelId(), deviceChannel);
         }
      }
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      // 数据去重
      List<DeviceChannel> channels = new ArrayList<>();
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      if (deviceChannelList != null && deviceChannelList.size() > 1) {
      if (deviceChannelList.size() > 1) {
         // 数据去重
         Set<String> gbIdSet = new HashSet<>();
         for (DeviceChannel deviceChannel : deviceChannelList) {
            if (!gbIdSet.contains(deviceChannel.getChannelId())) {
               gbIdSet.add(deviceChannel.getChannelId());
               if (allChannelMapInPlay.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMapInPlay.get(deviceChannel.getChannelId()).getStreamId());
               }
               channels.add(deviceChannel);
               if (!StringUtils.isEmpty(deviceChannel.getParentId())) {
                  if (subContMap.get(deviceChannel.getParentId()) == null) {
@@ -271,6 +157,10 @@
      }
      if (stringBuilder.length() > 0) {
         logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
      }
      if(CollectionUtils.isEmpty(channels)){
         logger.info("通道重设,数据为空={}" , deviceChannelList);
         return false;
      }
      try {
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
@@ -341,6 +231,9 @@
      List<DeviceChannel> all;
      if (catalogUnderDevice != null && catalogUnderDevice) {
         all = deviceChannelMapper.queryChannels(deviceId, deviceId, query, hasSubChannel, online);
         // 海康设备的parentId是SIP id
         List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, sipConfig.getId(), query, hasSubChannel, online);
         all.addAll(deviceChannels);
      }else {
         all = deviceChannelMapper.queryChannels(deviceId, null, query, hasSubChannel, online);
      }
@@ -490,6 +383,9 @@
    */
   @Override
   public synchronized boolean insertMobilePosition(MobilePosition mobilePosition) {
      if (mobilePosition.getDeviceId().equals(mobilePosition.getChannelId())) {
         mobilePosition.setChannelId(null);
      }
      return deviceMobilePositionMapper.insertNewPosition(mobilePosition) > 0;
   }
@@ -547,20 +443,6 @@
      // 更新缓存
      parentPlatformCatch.setParentPlatform(parentPlatform);
      redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
      if (parentPlatform.isEnable()) {
         // 共享所有视频流,需要将现有视频流添加到此平台
         List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
         if (gbStreams.size() > 0) {
            for (GbStream gbStream : gbStreams) {
               gbStream.setCatalogId(parentPlatform.getCatalogId());
            }
            if (parentPlatform.isShareAllLiveStream()) {
               gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
            }else {
               gbStreamService.delPlatformInfo(parentPlatform.getServerGBId(), gbStreams);
            }
         }
      }
      return result > 0;
   }
@@ -609,36 +491,6 @@
   public List<DeviceChannelInPlatform> queryChannelListInParentPlatform(String platformId) {
      return deviceChannelMapper.queryChannelByPlatformId(platformId);
   }
   @Override
   public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
      Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
      for (ChannelReduce channelReduce : channelReduces) {
         channelReduce.setCatalogId(catalogId);
         deviceAndChannels.put(channelReduce.getId(), channelReduce);
      }
      List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      // 查询当前已经存在的
      List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
      if (deviceAndChannelList != null) {
         deviceAndChannelList.removeAll(channelIds);
      }
      for (Integer channelId : channelIds) {
         deviceAndChannels.remove(channelId);
      }
      List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
      // 对剩下的数据进行存储
      int result = 0;
      if (channelReducesToAdd.size() > 0) {
         result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
         // TODO 后续给平台增加控制开关以控制是否响应目录订阅
         List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId);
         eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
      }
      return result;
   }
@@ -716,78 +568,6 @@
      return deviceMobilePositionMapper.clearMobilePositionsByDeviceId(deviceId);
   }
   /**
    * 新增代理流
    * @param streamProxyItem
    * @return
    */
   @Override
   public boolean addStreamProxy(StreamProxyItem streamProxyItem) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      streamProxyItem.setStatus(true);
      String now = DateUtil.getNow();
      streamProxyItem.setCreateTime(now);
      streamProxyItem.setCreateStamp(System.currentTimeMillis());
      try {
         if (streamProxyMapper.add(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.add(streamProxyItem) < 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         }else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
      }catch (Exception e) {
         logger.error("向数据库添加流代理失败:", e);
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 更新代理流
    * @param streamProxyItem
    * @return
    */
   @Override
   public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      try {
         if (streamProxyMapper.update(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         } else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         result = true;
      }catch (Exception e) {
         e.printStackTrace();
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 移除代理流
@@ -840,7 +620,7 @@
    * @return
    */
   @Override
   public List<GbStream> queryGbStreamListInPlatform(String platformId) {
   public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
      return gbStreamMapper.queryGbStreamListInPlatform(platformId);
   }
@@ -856,60 +636,32 @@
   }
   @Override
   public void updateMediaList(List<StreamPushItem> streamPushItems) {
      if (streamPushItems == null || streamPushItems.size() == 0) {
         return;
      }
      logger.info("updateMediaList:  " + streamPushItems.size());
      streamPushMapper.addAll(streamPushItems);
      // TODO 待优化
      for (int i = 0; i < streamPushItems.size(); i++) {
         int onlineResult = gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true);
         if (onlineResult > 0) {
            // 发送上线通知
            eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
         }
      }
   }
   @Override
   public void updateMedia(StreamPushItem streamPushItem) {
      streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
      streamPushMapper.add(streamPushItem);
      gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true);
      if(!StringUtils.isEmpty(streamPushItem.getGbId() )){
         // 查找开启了全部直播流共享的上级平台
         List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
         if (parentPlatforms.size() > 0) {
            for (ParentPlatform parentPlatform : parentPlatforms) {
               StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(streamPushItem.getApp(), streamPushItem.getStream(),
                     parentPlatform.getServerGBId());
               if (streamProxyItem == null) {
                  streamPushItem.setCatalogId(parentPlatform.getCatalogId());
                  streamPushItem.setPlatformId(parentPlatform.getServerGBId());
                  platformGbStreamMapper.add(streamPushItem);
                  eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), streamPushItem, CatalogEvent.ADD);
               }
            }
         }
      }
   }
   @Override
   public int removeMedia(String app, String stream) {
      return streamPushMapper.del(app, stream);
   }
   @Override
   public void clearMediaList() {
      streamPushMapper.clear();
   public int mediaOffline(String app, String stream) {
      GbStream gbStream = gbStreamMapper.selectOne(app, stream);
      int result;
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, false);
      }else {
         result = streamPushMapper.updatePushStatus(app, stream, false);
      }
      return result;
   }
   @Override
   public int mediaOutline(String app, String streamId) {
      return gbStreamMapper.setStatus(app, streamId, false);
   public int mediaOnline(String app, String stream) {
      GbStream gbStream = gbStreamMapper.selectOne(app, stream);
      int result;
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, true);
      }else {
         result = streamPushMapper.updatePushStatus(app, stream, true);
      }
      return result;
   }
   @Override
@@ -945,12 +697,44 @@
   @Override
   public int addCatalog(PlatformCatalog platformCatalog) {
      ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformCatalog.getPlatformId());
      if (platform == null) {
         return 0;
      }
      if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)) {
         if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
            // 第一层节点
            platformCatalog.setBusinessGroupId(platformCatalog.getId());
            platformCatalog.setParentId(platform.getDeviceGBId());
         }else {
            // 获取顶层的
            PlatformCatalog topCatalog = getTopCatalog(platformCatalog.getParentId(), platform.getDeviceGBId());
            platformCatalog.setBusinessGroupId(topCatalog.getId());
         }
      }
      if (platform.getTreeType().equals(TreeType.CIVIL_CODE)) {
         platformCatalog.setCivilCode(platformCatalog.getId());
         if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
            // 第一层节点
            platformCatalog.setParentId(platform.getDeviceGBId());
         }
      }
      int result = catalogMapper.add(platformCatalog);
      if (result > 0) {
         DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog);
         eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.ADD);
      }
      return result;
   }
   private PlatformCatalog getTopCatalog(String id, String platformId) {
      PlatformCatalog catalog = catalogMapper.selectParentCatalog(id);
      if (catalog.getParentId().equals(platformId)) {
         return catalog;
      }else {
         return getTopCatalog(catalog.getParentId(), platformId);
      }
   }
   @Override
@@ -1019,12 +803,12 @@
   @Override
   public int setDefaultCatalog(String platformId, String catalogId) {
      return platformMapper.setDefaultCatalog(platformId, catalogId);
      return platformMapper.setDefaultCatalog(platformId, catalogId, DateUtil.getNow());
   }
   @Override
   public List<PlatformCatalog> queryCatalogInPlatform(String platformId) {
      return catalogMapper.selectByPlatForm(platformId);
   public List<DeviceChannel> queryCatalogInPlatform(String platformId) {
      return catalogMapper.queryCatalogInPlatform(platformId);
   }
   @Override
@@ -1067,20 +851,24 @@
   }
   private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) {
      ParentPlatform parentPlatByServerGBId = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
      ParentPlatform platform = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
      DeviceChannel deviceChannel = new DeviceChannel();
      deviceChannel.setChannelId(catalog.getId());
      deviceChannel.setName(catalog.getName());
      deviceChannel.setLongitude(0.0);
      deviceChannel.setLatitude(0.0);
      deviceChannel.setDeviceId(parentPlatByServerGBId.getDeviceGBId());
      deviceChannel.setDeviceId(platform.getDeviceGBId());
      deviceChannel.setManufacture("wvp-pro");
      deviceChannel.setStatus(1);
      deviceChannel.setParental(1);
      deviceChannel.setParentId(catalog.getParentId());
      deviceChannel.setRegisterWay(1);
      // 行政区划应该是Domain的前八位
      deviceChannel.setCivilCode(parentPlatByServerGBId.getAdministrativeDivision());
      if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)) {
         deviceChannel.setParentId(catalog.getParentId());
         deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
      }
      deviceChannel.setModel("live");
      deviceChannel.setOwner("wvp-pro");
      deviceChannel.setSecrecy("0");
@@ -1132,7 +920,37 @@
   }
   @Override
   public void updateChannelPotion(String deviceId, String channelId, double longitude, double latitude) {
      deviceChannelMapper.updatePotion(deviceId, channelId, longitude, latitude);
   public void updateChannelPosition(DeviceChannel deviceChannel) {
      if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) {
         deviceChannel.setChannelId(null);
      }
      if (deviceChannel.getGpsTime() == null) {
         deviceChannel.setGpsTime(DateUtil.getNow());
      }
      deviceChannelMapper.updatePosition(deviceChannel);
   }
   @Override
   public void cleanContentForPlatform(String serverGBId) {
//      List<PlatformCatalog> catalogList = catalogMapper.selectByPlatForm(serverGBId);
//      if (catalogList.size() > 0) {
//         int result = catalogMapper.delByPlatformId(serverGBId);
//         if (result > 0) {
//            List<DeviceChannel> deviceChannels = new ArrayList<>();
//            for (PlatformCatalog catalog : catalogList) {
//               deviceChannels.add(getDeviceChannelByCatalog(catalog));
//            }
//            eventPublisher.catalogEventPublish(serverGBId, deviceChannels, CatalogEvent.DEL);
//         }
//      }
      catalogMapper.delByPlatformId(serverGBId);
      platformChannelMapper.delByPlatformId(serverGBId);
      platformGbStreamMapper.delByPlatformId(serverGBId);
   }
   @Override
   public List<DeviceChannel> queryChannelWithCatalog(String serverGBId) {
      return deviceChannelMapper.queryChannelWithCatalog(serverGBId);
   }
}