648540858
2023-01-31 f3e4928995401ba3b7ff0981867293838a50036e
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -1,12 +1,11 @@
package com.genersoft.iot.vmp.storager.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -26,7 +25,7 @@
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -83,6 +82,9 @@
    private GbStreamMapper gbStreamMapper;
   @Autowired
    private UserSetting userSetting;
   @Autowired
    private PlatformCatalogMapper catalogMapper;
   @Autowired
@@ -110,11 +112,11 @@
      if (CollectionUtils.isEmpty(deviceChannelList)) {
         return false;
      }
      List<DeviceChannel> allChannelInPlay = deviceChannelMapper.getAllChannelInPlay();
      Map<String,DeviceChannel> allChannelMapInPlay = new ConcurrentHashMap<>();
      if (allChannelInPlay.size() > 0) {
         for (DeviceChannel deviceChannel : allChannelInPlay) {
            allChannelMapInPlay.put(deviceChannel.getChannelId(), deviceChannel);
      List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
      Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
      if (allChannels.size() > 0) {
         for (DeviceChannel deviceChannel : allChannels) {
            allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
         }
      }
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
@@ -122,17 +124,18 @@
      List<DeviceChannel> channels = new ArrayList<>();
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      if (deviceChannelList.size() > 1) {
      if (deviceChannelList.size() > 0) {
         // 数据去重
         Set<String> gbIdSet = new HashSet<>();
         for (DeviceChannel deviceChannel : deviceChannelList) {
            if (!gbIdSet.contains(deviceChannel.getChannelId())) {
               gbIdSet.add(deviceChannel.getChannelId());
               if (allChannelMapInPlay.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMapInPlay.get(deviceChannel.getChannelId()).getStreamId());
               if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                  deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
               }
               channels.add(deviceChannel);
               if (!StringUtils.isEmpty(deviceChannel.getParentId())) {
               if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
                  if (subContMap.get(deviceChannel.getParentId()) == null) {
                     subContMap.put(deviceChannel.getParentId(), 1);
                  }else {
@@ -152,8 +155,6 @@
            }
         }
      }else {
         channels = deviceChannelList;
      }
      if (stringBuilder.length() > 0) {
         logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
@@ -193,6 +194,119 @@
   }
   @Override
   public boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
      if (CollectionUtils.isEmpty(deviceChannelList)) {
         return false;
      }
      List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
      Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
      if (allChannels.size() > 0) {
         for (DeviceChannel deviceChannel : allChannels) {
            allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
         }
      }
      List<DeviceChannel> addChannels = new ArrayList<>();
      List<DeviceChannel> updateChannels = new ArrayList<>();
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      // 数据去重
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      if (deviceChannelList.size() > 0) {
         // 数据去重
         Set<String> gbIdSet = new HashSet<>();
         for (DeviceChannel deviceChannel : deviceChannelList) {
            if (!gbIdSet.contains(deviceChannel.getChannelId())) {
               gbIdSet.add(deviceChannel.getChannelId());
               if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                  deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
                  updateChannels.add(deviceChannel);
               }else {
                  addChannels.add(deviceChannel);
               }
               if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
                  if (subContMap.get(deviceChannel.getParentId()) == null) {
                     subContMap.put(deviceChannel.getParentId(), 1);
                  }else {
                     Integer count = subContMap.get(deviceChannel.getParentId());
                     subContMap.put(deviceChannel.getParentId(), count++);
                  }
               }
            }else {
               stringBuilder.append(deviceChannel.getChannelId()).append(",");
            }
         }
         if (addChannels.size() > 0) {
            for (DeviceChannel channel : addChannels) {
               if (subContMap.get(channel.getChannelId()) != null){
                  channel.setSubCount(subContMap.get(channel.getChannelId()));
               }
            }
         }
         if (updateChannels.size() > 0) {
            for (DeviceChannel channel : updateChannels) {
               if (subContMap.get(channel.getChannelId()) != null){
                  channel.setSubCount(subContMap.get(channel.getChannelId()));
               }
            }
         }
      }
      if (stringBuilder.length() > 0) {
         logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
      }
      if(CollectionUtils.isEmpty(updateChannels) && CollectionUtils.isEmpty(addChannels) ){
         logger.info("通道更新,数据为空={}" , deviceChannelList);
         return false;
      }
      try {
         int limitCount = 300;
         boolean result = false;
         if (addChannels.size() > 0) {
            if (addChannels.size() > limitCount) {
               for (int i = 0; i < addChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > addChannels.size()) {
                     toIndex = addChannels.size();
                  }
                  result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
               }
            }else {
               result = result || deviceChannelMapper.batchAdd(addChannels) < 0;
            }
         }
         if (updateChannels.size() > 0) {
            if (updateChannels.size() > limitCount) {
               for (int i = 0; i < updateChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > updateChannels.size()) {
                     toIndex = updateChannels.size();
                  }
                  result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
               }
            }else {
               result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0;
            }
         }
         if (result) {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
         }else {
            //手动提交
            dataSourceTransactionManager.commit(transactionStatus);
         }
         return true;
      }catch (Exception e) {
         e.printStackTrace();
         dataSourceTransactionManager.rollback(transactionStatus);
         return false;
      }
   }
   @Override
   public void deviceChannelOnline(String deviceId, String channelId) {
      deviceChannelMapper.online(deviceId, channelId);
@@ -230,31 +344,31 @@
      PageHelper.startPage(page, count);
      List<DeviceChannel> all;
      if (catalogUnderDevice != null && catalogUnderDevice) {
         all = deviceChannelMapper.queryChannels(deviceId, deviceId, query, hasSubChannel, online);
         all = deviceChannelMapper.queryChannels(deviceId, deviceId, query, hasSubChannel, online,null);
         // 海康设备的parentId是SIP id
         List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, sipConfig.getId(), query, hasSubChannel, online);
         List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, sipConfig.getId(), query, hasSubChannel, online,null);
         all.addAll(deviceChannels);
      }else {
         all = deviceChannelMapper.queryChannels(deviceId, null, query, hasSubChannel, online);
         all = deviceChannelMapper.queryChannels(deviceId, null, query, hasSubChannel, online,null);
      }
      return new PageInfo<>(all);
   }
   @Override
   public List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String query, Boolean hasSubChannel, Boolean online, int start, int limit) {
      return deviceChannelMapper.queryChannelsByDeviceIdWithStartAndLimit(deviceId, null, query, hasSubChannel, online, start, limit);
   public List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String query, Boolean hasSubChannel, Boolean online, int start, int limit,List<String> channelIds) {
      return deviceChannelMapper.queryChannelsByDeviceIdWithStartAndLimit(deviceId, null, query, hasSubChannel, online, start, limit,channelIds);
   }
   @Override
   public List<DeviceChannel> queryChannelsByDeviceId(String deviceId) {
      return deviceChannelMapper.queryChannels(deviceId, null,null, null, null);
   public List<DeviceChannel> queryChannelsByDeviceId(String deviceId,Boolean online,List<String> channelIds) {
      return deviceChannelMapper.queryChannels(deviceId, null,null, null, online,channelIds);
   }
   @Override
   public PageInfo<DeviceChannel> querySubChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online, int page, int count) {
      PageHelper.startPage(page, count);
      List<DeviceChannel> all = deviceChannelMapper.queryChannels(deviceId, parentChannelId, query, hasSubChannel, online);
      List<DeviceChannel> all = deviceChannelMapper.queryChannels(deviceId, parentChannelId, query, hasSubChannel, online,null);
      return new PageInfo<>(all);
   }
@@ -277,9 +391,9 @@
    * @return PageInfo<Device> 分页设备对象数组
    */
   @Override
   public PageInfo<Device> queryVideoDeviceList(int page, int count) {
   public PageInfo<Device> queryVideoDeviceList(int page, int count,Boolean online) {
      PageHelper.startPage(page, count);
      List<Device> all = deviceMapper.getDevices();
      List<Device> all = deviceMapper.getDevices(online);
      return new PageInfo<>(all);
   }
@@ -289,83 +403,10 @@
    * @return List<Device> 设备对象数组
    */
   @Override
   public List<Device> queryVideoDeviceList() {
   public List<Device> queryVideoDeviceList(Boolean online) {
      List<Device> deviceList =  deviceMapper.getDevices();
      List<Device> deviceList =  deviceMapper.getDevices(online);
      return deviceList;
   }
   /**
    * 删除设备
    *
    * @param deviceId 设备ID
    * @return true:删除成功  false:删除失败
    */
   @Override
   public boolean delete(String deviceId) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      try {
         platformChannelMapper.delChannelForDeviceId(deviceId);
         deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
         if ( deviceMapper.del(deviceId) < 0 ) {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
      }catch (Exception e) {
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 更新设备在线
    *
    * @param deviceId 设备ID
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean online(String deviceId) {
      Device device = deviceMapper.getDeviceByDeviceId(deviceId);
      if (device == null) {
         return false;
      }
      device.setOnline(1);
      logger.info("更新设备在线: " + deviceId);
      redisCatchStorage.updateDevice(device);
      return deviceMapper.update(device) > 0;
   }
   /**
    * 更新设备离线
    *
    * @param deviceId 设备ID
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean outline(String deviceId) {
      logger.info("更新设备离线: " + deviceId);
      Device device = deviceMapper.getDeviceByDeviceId(deviceId);
      if (device == null) {
         return false;
      }
      device.setOnline(0);
      redisCatchStorage.updateDevice(device);
      return deviceMapper.update(device) > 0;
   }
   /**
    * 更新所有设备离线
    *
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean outlineForAll() {
      logger.info("更新所有设备离线");
      int result = deviceMapper.outlineForAll();
      return result > 0;
   }
   /**
@@ -454,13 +495,6 @@
      // 删除关联的通道
      platformChannelMapper.cleanChannelForGB(parentPlatform.getServerGBId());
      return result > 0;
   }
   @Override
   public PageInfo<ParentPlatform> queryParentPlatformList(int page, int count) {
      PageHelper.startPage(page, count);
      List<ParentPlatform> all = platformMapper.getParentPlatformList();
      return new PageInfo<>(all);
   }
   @Override
@@ -621,7 +655,7 @@
    */
   @Override
   public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
      return gbStreamMapper.queryGbStreamListInPlatform(platformId);
      return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus());
   }
   /**
@@ -636,44 +670,8 @@
   }
   @Override
   public void updateMediaList(List<StreamPushItem> streamPushItems) {
      if (streamPushItems == null || streamPushItems.size() == 0) {
         return;
      }
      logger.info("updateMediaList:  " + streamPushItems.size());
      streamPushMapper.addAll(streamPushItems);
      // TODO 待优化
      for (int i = 0; i < streamPushItems.size(); i++) {
         int onlineResult = mediaOnline(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream());
         if (onlineResult > 0) {
            // 发送上线通知
            eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
         }
      }
   }
   @Override
   public void updateMedia(StreamPushItem streamPushItem) {
      streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
      streamPushMapper.add(streamPushItem);
      mediaOffline(streamPushItem.getApp(), streamPushItem.getStream());
   }
   @Override
   public int removeMedia(String app, String stream) {
      return streamPushMapper.del(app, stream);
   }
   @Override
   public StreamPushItem getMedia(String app, String stream) {
      return streamPushMapper.selectOne(app, stream);
   }
   @Override
   public void clearMediaList() {
      streamPushMapper.clear();
   }
   @Override
@@ -683,7 +681,7 @@
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, false);
      }else {
         result = streamPushMapper.updateStatus(app, stream, false);
         result = streamPushMapper.updatePushStatus(app, stream, false);
      }
      return result;
   }
@@ -695,7 +693,7 @@
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, true);
      }else {
         result = streamPushMapper.updateStatus(app, stream, true);
         result = streamPushMapper.updatePushStatus(app, stream, true);
      }
      return result;
   }
@@ -738,17 +736,22 @@
         return 0;
      }
      if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)) {
         if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
         if (platform.getDeviceGBId().equals(platformCatalog.getParentId())) {
            // 第一层节点
            platformCatalog.setBusinessGroupId(platformCatalog.getId());
            platformCatalog.setParentId(platform.getDeviceGBId());
         }else {
            // 获取顶层的
            PlatformCatalog topCatalog = getTopCatalog(platformCatalog.getParentId(), platformCatalog.getPlatformId());
            PlatformCatalog topCatalog = getTopCatalog(platformCatalog.getParentId(), platform.getDeviceGBId());
            platformCatalog.setBusinessGroupId(topCatalog.getId());
         }
      }
      if (platform.getTreeType().equals(TreeType.CIVIL_CODE)) {
         platformCatalog.setCivilCode(platformCatalog.getId());
         if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
            // 第一层节点
            platformCatalog.setParentId(platform.getDeviceGBId());
         }
      }
      int result = catalogMapper.add(platformCatalog);
@@ -868,18 +871,6 @@
      return gbStreamMapper.updateStreamGPS(gpsMsgInfos);
   }
   private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId) {
      List<DeviceChannel> deviceChannelList = new ArrayList<>();
      if (channelReduces.size() > 0){
         for (ChannelReduce channelReduce : channelReduces) {
            DeviceChannel deviceChannel = queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
            deviceChannel.setParental(1);
            deviceChannel.setParentId(catalogId);
            deviceChannelList.add(deviceChannel);
         }
      }
      return deviceChannelList;
   }
   private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) {
      ParentPlatform platform = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());