package com.genersoft.iot.vmp.service.impl; import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import org.springframework.util.ObjectUtils; import java.util.*; public class StreamPushUploadFileHandler extends AnalysisEventListener { /** * 错误数据的回调,用于将错误数据发送给页面 */ private ErrorDataHandler errorDataHandler; /** * 推流的业务类用于存储数据 */ private IStreamPushService pushService; /** * 默认流媒体节点ID */ private String defaultMediaServerId; /** * 用于存储不加过滤的所有数据 */ private final List streamPushItems = new ArrayList<>(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ private final Map streamPushItemForSave = new HashMap<>(); /** * 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表 */ private final Map> streamPushItemsForPlatform = new HashMap<>(); /** * 用于判断文件是否存在重复的app+Stream+平台ID */ private final Set streamPushStreamSet = new HashSet<>(); /** * 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应,全局判断APP+Stream->国标ID是否存在不对应 */ private final BiMap gBMap = HashBiMap.create(); /** * 用于存储APP+Stream-> 在数据库中的数据 */ private final BiMap pushMapInDb = HashBiMap.create(); /** * 记录错误的APP+Stream */ private final List errorStreamList = new ArrayList<>(); /** * 记录错误的国标ID */ private final List errorInfoList = new ArrayList<>(); /** * 读取数量计数器 */ private int loadedSize = 0; public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) { this.pushService = pushService; this.defaultMediaServerId = defaultMediaServerId; this.errorDataHandler = errorDataHandler; // 获取数据库已有的数据,已经存在的则忽略 List allAppAndStreams = pushService.getAllAppAndStream(); if (allAppAndStreams.size() > 0) { for (String allAppAndStream : allAppAndStreams) { pushMapInDb.put(allAppAndStream, allAppAndStream); } } } public interface ErrorDataHandler{ void handle(List streams, List gbId); } @Override public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) { if (ObjectUtils.isEmpty(streamPushExcelDto.getApp()) || ObjectUtils.isEmpty(streamPushExcelDto.getStream()) || ObjectUtils.isEmpty(streamPushExcelDto.getGbId())) { return; } Integer rowIndex = analysisContext.readRowHolder().getRowIndex(); if (gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) { try { gBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId()); }catch (IllegalArgumentException e) { errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 国标ID重复使用"); return; } }else { if (!gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) { errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 同样的应用名和流ID使用了不同的国标ID"); return; } } if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 平台信息重复"); return; }else { if (pushMapInDb.get(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()) != null) { errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 数据已存在"); return; } streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId()); } StreamPushItem streamPushItem = new StreamPushItem(); streamPushItem.setApp(streamPushExcelDto.getApp()); streamPushItem.setStream(streamPushExcelDto.getStream()); streamPushItem.setGbId(streamPushExcelDto.getGbId()); streamPushItem.setStatus(streamPushExcelDto.getStatus()); streamPushItem.setStreamType("push"); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setMediaServerId(defaultMediaServerId); streamPushItem.setName(streamPushExcelDto.getName()); streamPushItem.setOriginType(2); streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId()); streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId()); // 存入所有的通道信息 streamPushItems.add(streamPushItem); streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) { List platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream()); if (platformList == null) { platformList = new ArrayList<>(); streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList); } String platformId = streamPushExcelDto.getPlatformId(); String catalogId = streamPushExcelDto.getCatalogId(); if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) { catalogId = null; } String[] platFormInfoArray = new String[]{platformId, catalogId}; platformList.add(platFormInfoArray); } loadedSize ++; if (loadedSize > 1000) { saveData(); streamPushItems.clear(); streamPushItemForSave.clear(); streamPushItemsForPlatform.clear(); loadedSize = 0; } } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { // 这里也要保存数据,确保最后遗留的数据也存储到数据库 saveData(); streamPushItems.clear(); streamPushItemForSave.clear(); gBMap.clear(); streamPushStreamSet.clear(); streamPushItemsForPlatform.clear(); errorDataHandler.handle(errorStreamList, errorInfoList); } private void saveData(){ if (streamPushItemForSave.size() > 0) { // 向数据库查询是否存在重复的app pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform); } } }