648540858
2023-12-26 33fba05a381db591a7f5874eb242f90065e3458d
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
old mode 100644 new mode 100755
@@ -1,12 +1,11 @@
package com.genersoft.iot.vmp.storager.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -15,6 +14,7 @@
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
@@ -26,7 +26,7 @@
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,11 +48,12 @@
   @Autowired
   SipConfig sipConfig;
   @Autowired
   DataSourceTransactionManager dataSourceTransactionManager;
   @Autowired
   TransactionDefinition transactionDefinition;
   @Autowired
   DataSourceTransactionManager dataSourceTransactionManager;
   @Autowired
    private DeviceMapper deviceMapper;
@@ -73,6 +74,9 @@
    private PlatformChannelMapper platformChannelMapper;
   @Autowired
   private PlatformCatalogMapper platformCatalogMapper;
   @Autowired
    private StreamProxyMapper streamProxyMapper;
   @Autowired
@@ -80,6 +84,9 @@
   @Autowired
    private GbStreamMapper gbStreamMapper;
   @Autowired
    private UserSetting userSetting;
   @Autowired
    private PlatformCatalogMapper catalogMapper;
@@ -105,123 +112,189 @@
   }
   @Override
   public synchronized void updateChannel(String deviceId, DeviceChannel channel) {
      String channelId = channel.getChannelId();
      channel.setDeviceId(deviceId);
      StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
      if (streamInfo != null) {
         channel.setStreamId(streamInfo.getStream());
   public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
      if (CollectionUtils.isEmpty(deviceChannelList)) {
         return false;
      }
      String now = DateUtil.getNow();
      channel.setUpdateTime(now);
      DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
      if (deviceChannel == null) {
         channel.setCreateTime(now);
         deviceChannelMapper.add(channel);
      }else {
         deviceChannelMapper.update(channel);
      List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
      Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
      if (allChannels.size() > 0) {
         for (DeviceChannel deviceChannel : allChannels) {
            allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
         }
      }
      deviceChannelMapper.updateChannelSubCount(deviceId,channel.getParentId());
   }
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      // 数据去重
      List<DeviceChannel> channels = new ArrayList<>();
   @Override
   public int updateChannels(String deviceId, List<DeviceChannel> channels) {
      List<DeviceChannel> addChannels = new ArrayList<>();
      List<DeviceChannel> updateChannels = new ArrayList<>();
      HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
      if (channels != null && channels.size() > 0) {
         List<DeviceChannel> channelList = deviceChannelMapper.queryChannels(deviceId, null, null, null, null);
         if (channelList.size() == 0) {
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
      List<DeviceChannel> addChannels = new ArrayList<>();
      List<DeviceChannel> deleteChannels = new ArrayList<>();
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      // 数据去重
      Set<String> gbIdSet = new HashSet<>();
      for (DeviceChannel deviceChannel : deviceChannelList) {
         if (gbIdSet.contains(deviceChannel.getChannelId())) {
            stringBuilder.append(deviceChannel.getChannelId()).append(",");
            continue;
         }
         gbIdSet.add(deviceChannel.getChannelId());
         if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
            deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
            deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
            if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
               List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
               if (!CollectionUtils.isEmpty(strings)){
                  strings.forEach(platformId->{
                     eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()?CatalogEvent.ON:CatalogEvent.OFF);
                  });
               }
               String now = DateUtil.getNow();
               channel.setUpdateTime(now);
               channel.setCreateTime(now);
               addChannels.add(channel);
            }
            deviceChannel.setUpdateTime(DateUtil.getNow());
            updateChannels.add(deviceChannel);
         }else {
            for (DeviceChannel deviceChannel : channelList) {
               channelsInStore.put(deviceChannel.getChannelId(), deviceChannel);
            deviceChannel.setCreateTime(DateUtil.getNow());
            deviceChannel.setUpdateTime(DateUtil.getNow());
            addChannels.add(deviceChannel);
         }
         allChannelMap.remove(deviceChannel.getChannelId());
         channels.add(deviceChannel);
         if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
            if (subContMap.get(deviceChannel.getParentId()) == null) {
               subContMap.put(deviceChannel.getParentId(), 1);
            }else {
               Integer count = subContMap.get(deviceChannel.getParentId());
               subContMap.put(deviceChannel.getParentId(), count++);
            }
            for (DeviceChannel channel : channels) {
               channel.setDeviceId(deviceId);
               StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
               if (streamInfo != null) {
                  channel.setStreamId(streamInfo.getStream());
               }
               String now = DateUtil.getNow();
               channel.setUpdateTime(now);
               if (channelsInStore.get(channel.getChannelId()) != null) {
                  updateChannels.add(channel);
               }else {
                  addChannels.add(channel);
                  channel.setCreateTime(now);
         }
      }
      deleteChannels.addAll(allChannelMap.values());
      if (!channels.isEmpty()) {
         for (DeviceChannel channel : channels) {
            if (subContMap.get(channel.getChannelId()) != null){
               Integer count = subContMap.get(channel.getChannelId());
               if (count > 0) {
                  channel.setSubCount(count);
                  channel.setParental(1);
               }
            }
         }
         int limitCount = 300;
         if (addChannels.size() > 0) {
      }
      if (stringBuilder.length() > 0) {
         logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
      }
      if(CollectionUtils.isEmpty(channels)){
         logger.info("通道重设,数据为空={}" , deviceChannelList);
         return false;
      }
      try {
         int limitCount = 50;
         boolean result = false;
         if (!result && !addChannels.isEmpty()) {
            if (addChannels.size() > limitCount) {
               for (int i = 0; i < addChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > addChannels.size()) {
                     toIndex = addChannels.size();
                  }
                  deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex));
                  result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
               }
            }else {
               deviceChannelMapper.batchAdd(addChannels);
               result = result || deviceChannelMapper.batchAdd(addChannels) < 0;
            }
         }
         if (updateChannels.size() > 0) {
         if (!result && !updateChannels.isEmpty()) {
            if (updateChannels.size() > limitCount) {
               for (int i = 0; i < updateChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > updateChannels.size()) {
                     toIndex = updateChannels.size();
                  }
                  deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex));
                  result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
               }
            }else {
               deviceChannelMapper.batchUpdate(updateChannels);
               result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0;
            }
         }
         if (!result && !deleteChannels.isEmpty()) {
            System.out.println("删除: " + deleteChannels.size());
            if (deleteChannels.size() > limitCount) {
               for (int i = 0; i < deleteChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > deleteChannels.size()) {
                     toIndex = deleteChannels.size();
                  }
                  result = result || deviceChannelMapper.batchDel(deleteChannels.subList(i, toIndex)) < 0;
               }
            }else {
               result = result || deviceChannelMapper.batchDel(deleteChannels) < 0;
            }
         }
         if (result) {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
         }
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         return true;
      }catch (Exception e) {
         logger.error("未处理的异常 ", e);
         dataSourceTransactionManager.rollback(transactionStatus);
         return false;
      }
      return addChannels.size() + updateChannels.size();
   }
   @Override
   public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
   public boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
      if (CollectionUtils.isEmpty(deviceChannelList)) {
         return false;
      }
      List<DeviceChannel> allChannelInPlay = deviceChannelMapper.getAllChannelInPlay();
      Map<String,DeviceChannel> allChannelMapInPlay = new ConcurrentHashMap<>();
      if (allChannelInPlay.size() > 0) {
         for (DeviceChannel deviceChannel : allChannelInPlay) {
            allChannelMapInPlay.put(deviceChannel.getChannelId(), deviceChannel);
      List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
      Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
      if (allChannels.size() > 0) {
         for (DeviceChannel deviceChannel : allChannels) {
            allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
         }
      }
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      // 数据去重
      List<DeviceChannel> channels = new ArrayList<>();
      List<DeviceChannel> updateChannels = new ArrayList<>();
      List<DeviceChannel> addChannels = new ArrayList<>();
      StringBuilder stringBuilder = new StringBuilder();
      Map<String, Integer> subContMap = new HashMap<>();
      if (deviceChannelList.size() > 1) {
      if (deviceChannelList.size() > 0) {
         // 数据去重
         Set<String> gbIdSet = new HashSet<>();
         for (DeviceChannel deviceChannel : deviceChannelList) {
            if (!gbIdSet.contains(deviceChannel.getChannelId())) {
               gbIdSet.add(deviceChannel.getChannelId());
               if (allChannelMapInPlay.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMapInPlay.get(deviceChannel.getChannelId()).getStreamId());
               deviceChannel.setUpdateTime(DateUtil.getNow());
               if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
                  deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
                  deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
                  if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
                     List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
                     if (!CollectionUtils.isEmpty(strings)){
                        strings.forEach(platformId->{
                           eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()?CatalogEvent.ON:CatalogEvent.OFF);
                        });
                     }
                  }
                  updateChannels.add(deviceChannel);
               }else {
                  deviceChannel.setCreateTime(DateUtil.getNow());
                  addChannels.add(deviceChannel);
               }
               channels.add(deviceChannel);
               if (!StringUtils.isEmpty(deviceChannel.getParentId())) {
               if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
                  if (subContMap.get(deviceChannel.getParentId()) == null) {
                     subContMap.put(deviceChannel.getParentId(), 1);
                  }else {
@@ -241,8 +314,6 @@
            }
         }
      }else {
         channels = deviceChannelList;
      }
      if (stringBuilder.length() > 0) {
         logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
@@ -252,22 +323,35 @@
         return false;
      }
      try {
         int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
         int limitCount = 300;
         boolean result = cleanChannelsResult < 0;
         if (!result && channels.size() > 0) {
            if (channels.size() > limitCount) {
               for (int i = 0; i < channels.size(); i += limitCount) {
         int limitCount = 50;
         boolean result = false;
         if (addChannels.size() > 0) {
            if (addChannels.size() > limitCount) {
               for (int i = 0; i < addChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > channels.size()) {
                     toIndex = channels.size();
                  if (i + limitCount > addChannels.size()) {
                     toIndex = addChannels.size();
                  }
                  result = result || deviceChannelMapper.batchAdd(channels.subList(i, toIndex)) < 0;
                  result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
               }
            }else {
               result = result || deviceChannelMapper.batchAdd(channels) < 0;
               result = result || deviceChannelMapper.batchAdd(addChannels) < 0;
            }
         }
         if (updateChannels.size() > 0) {
            if (updateChannels.size() > limitCount) {
               for (int i = 0; i < updateChannels.size(); i += limitCount) {
                  int toIndex = i + limitCount;
                  if (i + limitCount > updateChannels.size()) {
                     toIndex = updateChannels.size();
                  }
                  result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
               }
            }else {
               result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0;
            }
         }
         if (result) {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
@@ -275,11 +359,10 @@
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         return true;
      }catch (Exception e) {
         e.printStackTrace();
         logger.error("未处理的异常 ", e);
         dataSourceTransactionManager.rollback(transactionStatus);
         return false;
      }
   }
   @Override
@@ -319,31 +402,36 @@
      PageHelper.startPage(page, count);
      List<DeviceChannel> all;
      if (catalogUnderDevice != null && catalogUnderDevice) {
         all = deviceChannelMapper.queryChannels(deviceId, deviceId, query, hasSubChannel, online);
         all = deviceChannelMapper.queryChannels(deviceId, deviceId, query, hasSubChannel, online,null);
         // 海康设备的parentId是SIP id
         List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, sipConfig.getId(), query, hasSubChannel, online);
         List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, sipConfig.getId(), query, hasSubChannel, online,null);
         all.addAll(deviceChannels);
      }else {
         all = deviceChannelMapper.queryChannels(deviceId, null, query, hasSubChannel, online);
         all = deviceChannelMapper.queryChannels(deviceId, null, query, hasSubChannel, online,null);
      }
      return new PageInfo<>(all);
   }
   @Override
   public List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String query, Boolean hasSubChannel, Boolean online, int start, int limit) {
      return deviceChannelMapper.queryChannelsByDeviceIdWithStartAndLimit(deviceId, null, query, hasSubChannel, online, start, limit);
   public List<DeviceChannelExtend> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, List<String> channelIds, String query, Boolean hasSubChannel, Boolean online, int start, int limit) {
      return deviceChannelMapper.queryChannelsByDeviceIdWithStartAndLimit(deviceId, channelIds, null, query, hasSubChannel, online, start, limit);
   }
   @Override
   public List<DeviceChannel> queryChannelsByDeviceId(String deviceId) {
      return deviceChannelMapper.queryChannels(deviceId, null,null, null, null);
   public List<DeviceChannel> queryChannelsByDeviceId(String deviceId,Boolean online,List<String> channelIds) {
      return deviceChannelMapper.queryChannels(deviceId, null,null, null, online,channelIds);
   }
   @Override
   public List<DeviceChannelExtend> queryChannelsByDeviceId(String deviceId, List<String> channelIds, Boolean online) {
      return deviceChannelMapper.queryChannelsWithDeviceInfo(deviceId, null,null, null, online,channelIds);
   }
   @Override
   public PageInfo<DeviceChannel> querySubChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online, int page, int count) {
      PageHelper.startPage(page, count);
      List<DeviceChannel> all = deviceChannelMapper.queryChannels(deviceId, parentChannelId, query, hasSubChannel, online);
      List<DeviceChannel> all = deviceChannelMapper.queryChannels(deviceId, parentChannelId, query, hasSubChannel, online,null);
      return new PageInfo<>(all);
   }
@@ -366,9 +454,9 @@
    * @return PageInfo<Device> 分页设备对象数组
    */
   @Override
   public PageInfo<Device> queryVideoDeviceList(int page, int count) {
   public PageInfo<Device> queryVideoDeviceList(int page, int count,Boolean online) {
      PageHelper.startPage(page, count);
      List<Device> all = deviceMapper.getDevices();
      List<Device> all = deviceMapper.getDevices(online);
      return new PageInfo<>(all);
   }
@@ -378,83 +466,10 @@
    * @return List<Device> 设备对象数组
    */
   @Override
   public List<Device> queryVideoDeviceList() {
   public List<Device> queryVideoDeviceList(Boolean online) {
      List<Device> deviceList =  deviceMapper.getDevices();
      List<Device> deviceList =  deviceMapper.getDevices(online);
      return deviceList;
   }
   /**
    * 删除设备
    *
    * @param deviceId 设备ID
    * @return true:删除成功  false:删除失败
    */
   @Override
   public boolean delete(String deviceId) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      try {
         platformChannelMapper.delChannelForDeviceId(deviceId);
         deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
         if ( deviceMapper.del(deviceId) < 0 ) {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
      }catch (Exception e) {
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 更新设备在线
    *
    * @param deviceId 设备ID
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean online(String deviceId) {
      Device device = deviceMapper.getDeviceByDeviceId(deviceId);
      if (device == null) {
         return false;
      }
      device.setOnline(1);
      logger.info("更新设备在线: " + deviceId);
      redisCatchStorage.updateDevice(device);
      return deviceMapper.update(device) > 0;
   }
   /**
    * 更新设备离线
    *
    * @param deviceId 设备ID
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean outline(String deviceId) {
      logger.info("更新设备离线: " + deviceId);
      Device device = deviceMapper.getDeviceByDeviceId(deviceId);
      if (device == null) {
         return false;
      }
      device.setOnline(0);
      redisCatchStorage.updateDevice(device);
      return deviceMapper.update(device) > 0;
   }
   /**
    * 更新所有设备离线
    *
    * @return true:更新成功  false:更新失败
    */
   @Override
   public synchronized boolean outlineForAll() {
      logger.info("更新所有设备离线");
      int result = deviceMapper.outlineForAll();
      return result > 0;
   }
   /**
@@ -529,20 +544,6 @@
      // 更新缓存
      parentPlatformCatch.setParentPlatform(parentPlatform);
      redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
      if (parentPlatform.isEnable()) {
         // 共享所有视频流,需要将现有视频流添加到此平台
         List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
         if (gbStreams.size() > 0) {
            for (GbStream gbStream : gbStreams) {
               gbStream.setCatalogId(parentPlatform.getCatalogId());
            }
            if (parentPlatform.isShareAllLiveStream()) {
               gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
            }else {
               gbStreamService.delPlatformInfo(parentPlatform.getServerGBId(), gbStreams);
            }
         }
      }
      return result > 0;
   }
@@ -557,13 +558,6 @@
   }
   @Override
   public PageInfo<ParentPlatform> queryParentPlatformList(int page, int count) {
      PageHelper.startPage(page, count);
      List<ParentPlatform> all = platformMapper.getParentPlatformList();
      return new PageInfo<>(all);
   }
   @Override
   public ParentPlatform queryParentPlatByServerGBId(String platformGbId) {
      return platformMapper.getParentPlatByServerGBId(platformGbId);
   }
@@ -571,6 +565,16 @@
   @Override
   public List<ParentPlatform> queryEnableParentPlatformList(boolean enable) {
      return platformMapper.getEnableParentPlatformList(enable);
   }
   @Override
   public List<ParentPlatform> queryEnablePlatformListWithAsMessageChannel() {
      return platformMapper.queryEnablePlatformListWithAsMessageChannel();
   }
   @Override
   public List<Device> queryDeviceWithAsMessageChannel() {
      return deviceMapper.queryDeviceWithAsMessageChannel();
   }
   @Override
@@ -591,36 +595,6 @@
   public List<DeviceChannelInPlatform> queryChannelListInParentPlatform(String platformId) {
      return deviceChannelMapper.queryChannelByPlatformId(platformId);
   }
   @Override
   public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
      Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
      for (ChannelReduce channelReduce : channelReduces) {
         channelReduce.setCatalogId(catalogId);
         deviceAndChannels.put(channelReduce.getId(), channelReduce);
      }
      List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
      // 查询当前已经存在的
      List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
      if (deviceAndChannelList != null) {
         deviceAndChannelList.removeAll(channelIds);
      }
      for (Integer channelId : channelIds) {
         deviceAndChannels.remove(channelId);
      }
      List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
      // 对剩下的数据进行存储
      int result = 0;
      if (channelReducesToAdd.size() > 0) {
         result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
         // TODO 后续给平台增加控制开关以控制是否响应目录订阅
         List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId);
         eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
      }
      return result;
   }
@@ -680,6 +654,20 @@
   }
   @Override
   public Device queryDeviceInfoByPlatformIdAndChannelId(String platformId, String channelId) {
      List<Device> devices = platformChannelMapper.queryDeviceInfoByPlatformIdAndChannelId(platformId, channelId);
      if (devices.size() > 1) {
         // 出现长度大于0的时候肯定是国标通道的ID重复了
         logger.warn("国标ID存在重复:{}", channelId);
      }
      if (devices.size() == 0) {
         return null;
      }else {
         return devices.get(0);
      }
   }
   /**
    * 查询最新移动位置
    * @param deviceId
@@ -698,78 +686,6 @@
      return deviceMobilePositionMapper.clearMobilePositionsByDeviceId(deviceId);
   }
   /**
    * 新增代理流
    * @param streamProxyItem
    * @return
    */
   @Override
   public boolean addStreamProxy(StreamProxyItem streamProxyItem) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      streamProxyItem.setStatus(true);
      String now = DateUtil.getNow();
      streamProxyItem.setCreateTime(now);
      streamProxyItem.setCreateStamp(System.currentTimeMillis());
      try {
         if (streamProxyMapper.add(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.add(streamProxyItem) < 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         }else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         result = true;
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
      }catch (Exception e) {
         logger.error("向数据库添加流代理失败:", e);
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 更新代理流
    * @param streamProxyItem
    * @return
    */
   @Override
   public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      boolean result = false;
      streamProxyItem.setStreamType("proxy");
      try {
         if (streamProxyMapper.update(streamProxyItem) > 0) {
            if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
               if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
                  //事务回滚
                  dataSourceTransactionManager.rollback(transactionStatus);
                  return false;
               }
            }
         } else {
            //事务回滚
            dataSourceTransactionManager.rollback(transactionStatus);
            return false;
         }
         dataSourceTransactionManager.commit(transactionStatus);     //手动提交
         result = true;
      }catch (Exception e) {
         e.printStackTrace();
         dataSourceTransactionManager.rollback(transactionStatus);
      }
      return result;
   }
   /**
    * 移除代理流
@@ -822,8 +738,8 @@
    * @return
    */
   @Override
   public List<GbStream> queryGbStreamListInPlatform(String platformId) {
      return gbStreamMapper.queryGbStreamListInPlatform(platformId);
   public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
      return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus());
   }
   /**
@@ -838,60 +754,32 @@
   }
   @Override
   public void updateMediaList(List<StreamPushItem> streamPushItems) {
      if (streamPushItems == null || streamPushItems.size() == 0) {
         return;
      }
      logger.info("updateMediaList:  " + streamPushItems.size());
      streamPushMapper.addAll(streamPushItems);
      // TODO 待优化
      for (int i = 0; i < streamPushItems.size(); i++) {
         int onlineResult = gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true);
         if (onlineResult > 0) {
            // 发送上线通知
            eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
         }
      }
   }
   @Override
   public void updateMedia(StreamPushItem streamPushItem) {
      streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
      streamPushMapper.add(streamPushItem);
      gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true);
      if(!StringUtils.isEmpty(streamPushItem.getGbId() )){
         // 查找开启了全部直播流共享的上级平台
         List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
         if (parentPlatforms.size() > 0) {
            for (ParentPlatform parentPlatform : parentPlatforms) {
               StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(streamPushItem.getApp(), streamPushItem.getStream(),
                     parentPlatform.getServerGBId());
               if (streamProxyItem == null) {
                  streamPushItem.setCatalogId(parentPlatform.getCatalogId());
                  streamPushItem.setPlatformId(parentPlatform.getServerGBId());
                  platformGbStreamMapper.add(streamPushItem);
                  eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), streamPushItem, CatalogEvent.ADD);
               }
            }
         }
      }
   }
   @Override
   public int removeMedia(String app, String stream) {
      return streamPushMapper.del(app, stream);
   }
   @Override
   public void clearMediaList() {
      streamPushMapper.clear();
   public int mediaOffline(String app, String stream) {
      GbStream gbStream = gbStreamMapper.selectOne(app, stream);
      int result;
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, false);
      }else {
         result = streamPushMapper.updatePushStatus(app, stream, false);
      }
      return result;
   }
   @Override
   public int mediaOutline(String app, String streamId) {
      return gbStreamMapper.setStatus(app, streamId, false);
   public int mediaOnline(String app, String stream) {
      GbStream gbStream = gbStreamMapper.selectOne(app, stream);
      int result;
      if ("proxy".equals(gbStream.getStreamType())) {
         result = streamProxyMapper.updateStatus(app, stream, true);
      }else {
         result = streamPushMapper.updatePushStatus(app, stream, true);
      }
      return result;
   }
   @Override
@@ -927,6 +815,53 @@
   @Override
   public int addCatalog(PlatformCatalog platformCatalog) {
      ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformCatalog.getPlatformId());
      if (platform == null) {
         return 0;
      }
      if (platformCatalog.getId().length() <= 8) {
         platformCatalog.setCivilCode(platformCatalog.getParentId());
      }else {
         if (platformCatalog.getId().length() != 20) {
            return 0;
         }
         if (platformCatalog.getParentId() != null) {
            switch (Integer.parseInt(platformCatalog.getId().substring(10, 13))){
               case 200:
               case 215:
                  if (platformCatalog.getParentId().length() <= 8) {
                     platformCatalog.setCivilCode(platformCatalog.getParentId());
                  }else {
                     PlatformCatalog catalog = catalogMapper.selectByPlatFormAndCatalogId(platformCatalog.getPlatformId(), platformCatalog.getParentId());
                     if (catalog != null) {
                        platformCatalog.setCivilCode(catalog.getCivilCode());
                     }
                  }
                  break;
               case 216:
                  if (platformCatalog.getParentId().length() <= 8) {
                     platformCatalog.setCivilCode(platformCatalog.getParentId());
                  }else {
                     PlatformCatalog catalog = catalogMapper.selectByPlatFormAndCatalogId(platformCatalog.getPlatformId(),platformCatalog.getParentId());
                     if (catalog == null) {
                        logger.warn("[添加目录] 无法获取目录{}的CivilCode和BusinessGroupId", platformCatalog.getPlatformId());
                        break;
                     }
                     platformCatalog.setCivilCode(catalog.getCivilCode());
                     if (Integer.parseInt(platformCatalog.getParentId().substring(10, 13)) == 215) {
                        platformCatalog.setBusinessGroupId(platformCatalog.getParentId());
                     }else {
                        if (Integer.parseInt(platformCatalog.getParentId().substring(10, 13)) == 216) {
                           platformCatalog.setBusinessGroupId(catalog.getBusinessGroupId());
                        }
                     }
                  }
                  break;
               default:
                  break;
            }
         }
      }
      int result = catalogMapper.add(platformCatalog);
      if (result > 0) {
         DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog);
@@ -935,28 +870,26 @@
      return result;
   }
   @Override
   public PlatformCatalog getCatalog(String id) {
      return catalogMapper.select(id);
   private PlatformCatalog getTopCatalog(String id, String platformId) {
      PlatformCatalog catalog = catalogMapper.selectByPlatFormAndCatalogId(platformId, id);
      if (catalog.getParentId().equals(platformId)) {
         return catalog;
      }else {
         return getTopCatalog(catalog.getParentId(), platformId);
      }
   }
   @Override
   public int delCatalog(String id) {
      PlatformCatalog platformCatalog = catalogMapper.select(id);
      if (platformCatalog.getChildrenCount() > 0) {
         List<PlatformCatalog> platformCatalogList = catalogMapper.selectByParentId(platformCatalog.getPlatformId(), platformCatalog.getId());
         for (PlatformCatalog catalog : platformCatalogList) {
            if (catalog.getChildrenCount() == 0) {
               delCatalogExecute(catalog.getId(), catalog.getPlatformId());
            }else {
               delCatalog(catalog.getId());
            }
         }
      }
      return delCatalogExecute(id, platformCatalog.getPlatformId());
   public PlatformCatalog getCatalog(String platformId, String id) {
      return catalogMapper.selectByPlatFormAndCatalogId(platformId, id);
   }
   @Override
   public int delCatalog(String platformId, String id) {
      return delCatalogExecute(id, platformId);
   }
   private int delCatalogExecute(String id, String platformId) {
      int delresult =  catalogMapper.del(id);
      int delresult =  catalogMapper.del(platformId, id);
      DeviceChannel deviceChannelForCatalog = new DeviceChannel();
      if (delresult > 0){
         deviceChannelForCatalog.setChannelId(id);
@@ -973,7 +906,7 @@
         }
         eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL);
      }
      int delStreamresult = platformGbStreamMapper.delByCatalogId(id);
      int delStreamresult = platformGbStreamMapper.delByPlatformAndCatalogId(platformId,id);
      List<PlatformCatalog> platformCatalogs = platformChannelMapper.queryChannelInParentPlatformAndCatalog(platformId, id);
      if (platformCatalogs.size() > 0){
         List<DeviceChannel> deviceChannelList = new ArrayList<>();
@@ -984,8 +917,42 @@
         }
         eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL);
      }
      int delChannelresult = platformChannelMapper.delByCatalogId(id);
      int delChannelresult = platformChannelMapper.delByCatalogId(platformId, id);
      // 查看是否存在子目录,如果存在一并删除
      List<String> allChildCatalog = getAllChildCatalog(id, platformId);
      if (!allChildCatalog.isEmpty()) {
         int limitCount = 50;
         if (allChildCatalog.size() > limitCount) {
            for (int i = 0; i < allChildCatalog.size(); i += limitCount) {
               int toIndex = i + limitCount;
               if (i + limitCount > allChildCatalog.size()) {
                  toIndex = allChildCatalog.size();
               }
               delChannelresult += platformCatalogMapper.deleteAll(platformId, allChildCatalog.subList(i, toIndex));
            }
         }else {
            delChannelresult += platformCatalogMapper.deleteAll(platformId, allChildCatalog);
         }
      }
      return delresult + delChannelresult + delStreamresult;
   }
   private List<String> getAllChildCatalog(String id, String platformId) {
      List<String> catalogList = platformCatalogMapper.queryCatalogFromParent(id, platformId);
      List<String> catalogListChild = new ArrayList<>();
      if (catalogList != null && !catalogList.isEmpty()) {
         for (String childId : catalogList) {
            List<String> allChildCatalog = getAllChildCatalog(childId, platformId);
            if (allChildCatalog != null && !allChildCatalog.isEmpty()) {
               catalogListChild.addAll(allChildCatalog);
            }
         }
      }
      if (!catalogListChild.isEmpty()) {
         catalogList.addAll(catalogListChild);
      }
      return catalogList;
   }
@@ -1001,12 +968,12 @@
   @Override
   public int setDefaultCatalog(String platformId, String catalogId) {
      return platformMapper.setDefaultCatalog(platformId, catalogId);
      return platformMapper.setDefaultCatalog(platformId, catalogId, DateUtil.getNow());
   }
   @Override
   public List<PlatformCatalog> queryCatalogInPlatform(String platformId) {
      return catalogMapper.selectByPlatForm(platformId);
   public List<DeviceChannel> queryCatalogInPlatform(String platformId) {
      return catalogMapper.queryCatalogInPlatform(platformId);
   }
   @Override
@@ -1035,34 +1002,21 @@
      return gbStreamMapper.updateStreamGPS(gpsMsgInfos);
   }
   private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId) {
      List<DeviceChannel> deviceChannelList = new ArrayList<>();
      if (channelReduces.size() > 0){
         for (ChannelReduce channelReduce : channelReduces) {
            DeviceChannel deviceChannel = queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
            deviceChannel.setParental(1);
            deviceChannel.setParentId(catalogId);
            deviceChannelList.add(deviceChannel);
         }
      }
      return deviceChannelList;
   }
   private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) {
      ParentPlatform parentPlatByServerGBId = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
      ParentPlatform platform = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
      DeviceChannel deviceChannel = new DeviceChannel();
      deviceChannel.setChannelId(catalog.getId());
      deviceChannel.setName(catalog.getName());
      deviceChannel.setLongitude(0.0);
      deviceChannel.setLatitude(0.0);
      deviceChannel.setDeviceId(parentPlatByServerGBId.getDeviceGBId());
      deviceChannel.setDeviceId(platform.getDeviceGBId());
      deviceChannel.setManufacture("wvp-pro");
      deviceChannel.setStatus(1);
      deviceChannel.setStatus(true);
      deviceChannel.setParental(1);
      deviceChannel.setParentId(catalog.getParentId());
      deviceChannel.setRegisterWay(1);
      // 行政区划应该是Domain的前八位
      deviceChannel.setCivilCode(parentPlatByServerGBId.getAdministrativeDivision());
      deviceChannel.setParentId(catalog.getParentId());
      deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
      deviceChannel.setModel("live");
      deviceChannel.setOwner("wvp-pro");
      deviceChannel.setSecrecy("0");
@@ -1114,7 +1068,37 @@
   }
   @Override
   public void updateChannelPotion(String deviceId, String channelId, double longitude, double latitude) {
      deviceChannelMapper.updatePotion(deviceId, channelId, longitude, latitude);
   public void updateChannelPosition(DeviceChannel deviceChannel) {
      if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) {
         deviceChannel.setChannelId(null);
      }
      if (deviceChannel.getGpsTime() == null) {
         deviceChannel.setGpsTime(DateUtil.getNow());
      }
      deviceChannelMapper.updatePosition(deviceChannel);
   }
   @Override
   public void cleanContentForPlatform(String serverGBId) {
//      List<PlatformCatalog> catalogList = catalogMapper.selectByPlatForm(serverGBId);
//      if (catalogList.size() > 0) {
//         int result = catalogMapper.delByPlatformId(serverGBId);
//         if (result > 0) {
//            List<DeviceChannel> deviceChannels = new ArrayList<>();
//            for (PlatformCatalog catalog : catalogList) {
//               deviceChannels.add(getDeviceChannelByCatalog(catalog));
//            }
//            eventPublisher.catalogEventPublish(serverGBId, deviceChannels, CatalogEvent.DEL);
//         }
//      }
      catalogMapper.delByPlatformId(serverGBId);
      platformChannelMapper.delByPlatformId(serverGBId);
      platformGbStreamMapper.delByPlatformId(serverGBId);
   }
   @Override
   public List<DeviceChannel> queryChannelWithCatalog(String serverGBId) {
      return deviceChannelMapper.queryChannelWithCatalog(serverGBId);
   }
}