| | |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Service |
| | | public class StreamPushServiceImpl implements IStreamPushService { |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems) { |
| | | public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { |
| | | // 存储数据到stream_push表 |
| | | streamPushMapper.addAll(streamPushItems); |
| | | gbStreamMapper.batchAdd(streamPushItems); |
| | | if (platformId != null) { |
| | | ParentPlatform platform = parentPlatformMapper.getParentPlatByServerGBId(platformId); |
| | | if (platform != null) { |
| | | if (catalogId == null) { |
| | | catalogId = platform.getCatalogId(); |
| | | }else { |
| | | PlatformCatalog catalog = platformCatalogMapper.select(catalogId); |
| | | if (catalog == null) { |
| | | return; |
| | | List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream() |
| | | .filter(streamPushItem-> streamPushItem.getId() != null) |
| | | .collect(Collectors.toList()); |
| | | // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 |
| | | if (streamPushItemForGbStream.size() > 0) { |
| | | gbStreamMapper.batchAdd(streamPushItemForGbStream); |
| | | } |
| | | // 去除没有ID也就是没有存储到数据库的数据 |
| | | List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream() |
| | | .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) |
| | | .collect(Collectors.toList()); |
| | | |
| | | if (streamPushItemsForPlatform.size() > 0) { |
| | | List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>(); |
| | | Map<String, List<StreamPushItem>> platformForEvent = new HashMap<>(); |
| | | // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 |
| | | for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { |
| | | List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); |
| | | if (platFormInfoList != null) { |
| | | if (platFormInfoList.size() > 0) { |
| | | for (String[] platFormInfoArray : platFormInfoList) { |
| | | StreamPushItem streamPushItemForPlatform = new StreamPushItem(); |
| | | streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); |
| | | if (platFormInfoArray.length > 0) { |
| | | // 数组 platFormInfoArray 0 为平台ID。 1为目录ID |
| | | streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); |
| | | |
| | | List<StreamPushItem> streamPushItemsInPlatform = platformForEvent.get(streamPushItem.getPlatformId()); |
| | | if (streamPushItemsInPlatform == null) { |
| | | streamPushItemsInPlatform = new ArrayList<>(); |
| | | platformForEvent.put(platFormInfoArray[0], streamPushItemsInPlatform); |
| | | } |
| | | // 为发送通知整理数据 |
| | | streamPushItemForPlatform.setApp(streamPushItem.getApp()); |
| | | streamPushItemForPlatform.setStream(streamPushItem.getStream()); |
| | | streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); |
| | | streamPushItemsInPlatform.add(streamPushItemForPlatform); |
| | | } |
| | | if (platFormInfoArray.length > 1) { |
| | | streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); |
| | | } |
| | | streamPushItemListFroPlatform.add(streamPushItemForPlatform); |
| | | |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | List<GbStream> gbStreamList = gbStreamMapper.selectAllForAppAndStream(streamPushItems); |
| | | platformGbStreamMapper.batchAdd(platformId, catalogId, gbStreamList); |
| | | eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD); |
| | | } |
| | | platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); |
| | | // 发送通知 |
| | | for (String platformId : platformForEvent.keySet()) { |
| | | eventPublisher.catalogEventPublishForStream( |
| | | platformId, platformForEvent.get(platformId).toArray(new GbStream[0]), CatalogEvent.ADD); |
| | | } |
| | | } |
| | | } |