648540858
2022-02-24 a42dda2bd3cc1cf8c20cc61e7ad9211eadecbaf3
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -28,6 +28,7 @@
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class StreamPushServiceImpl implements IStreamPushService {
@@ -359,23 +360,63 @@
    }
    @Override
    public void batchAddForUpload(String platformId, String catalogId, List<StreamPushItem> streamPushItems) {
    public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
        // 存储数据到stream_push表
        streamPushMapper.addAll(streamPushItems);
        gbStreamMapper.batchAdd(streamPushItems);
        if (platformId != null) {
            ParentPlatform platform = parentPlatformMapper.getParentPlatByServerGBId(platformId);
            if (platform != null) {
                if (catalogId == null) {
                    catalogId = platform.getCatalogId();
                }else {
                    PlatformCatalog catalog = platformCatalogMapper.select(catalogId);
                    if (catalog == null) {
                        return;
        List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
                .filter(streamPushItem-> streamPushItem.getId() != null)
                .collect(Collectors.toList());
        // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
        if (streamPushItemForGbStream.size() > 0) {
            gbStreamMapper.batchAdd(streamPushItemForGbStream);
        }
        // 去除没有ID也就是没有存储到数据库的数据
        List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
                .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
                .collect(Collectors.toList());
        if (streamPushItemsForPlatform.size() > 0) {
            List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
            Map<String, List<StreamPushItem>> platformForEvent = new HashMap<>();
            // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
            for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
                List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
                if (platFormInfoList != null) {
                    if (platFormInfoList.size() > 0) {
                        for (String[] platFormInfoArray : platFormInfoList) {
                            StreamPushItem streamPushItemForPlatform = new StreamPushItem();
                            streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
                            if (platFormInfoArray.length > 0) {
                                // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
                                streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
                                List<StreamPushItem> streamPushItemsInPlatform = platformForEvent.get(streamPushItem.getPlatformId());
                                if (streamPushItemsInPlatform == null) {
                                    streamPushItemsInPlatform = new ArrayList<>();
                                    platformForEvent.put(platFormInfoArray[0], streamPushItemsInPlatform);
                                }
                                // 为发送通知整理数据
                                streamPushItemForPlatform.setApp(streamPushItem.getApp());
                                streamPushItemForPlatform.setStream(streamPushItem.getStream());
                                streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
                                streamPushItemsInPlatform.add(streamPushItemForPlatform);
                            }
                            if (platFormInfoArray.length > 1) {
                                streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
                            }
                            streamPushItemListFroPlatform.add(streamPushItemForPlatform);
                        }
                    }
                }
                List<GbStream> gbStreamList = gbStreamMapper.selectAllForAppAndStream(streamPushItems);
                platformGbStreamMapper.batchAdd(platformId, catalogId, gbStreamList);
                eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD);
            }
            platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
            // 发送通知
            for (String platformId : platformForEvent.keySet()) {
                eventPublisher.catalogEventPublishForStream(
                        platformId, platformForEvent.get(platformId).toArray(new GbStream[0]), CatalogEvent.ADD);
            }
        }
    }