648540858
2022-10-18 1af77ab5f7c11a4b3d59c1989b51b9fca29679ce
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -13,6 +14,7 @@
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -21,7 +23,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -67,6 +73,16 @@
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;
    @Autowired
    TransactionDefinition transactionDefinition;
    @Autowired
    private MediaConfig mediaConfig;
    @Override
    public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
@@ -132,30 +148,9 @@
        stream.setStreamType("push");
        stream.setStatus(true);
        stream.setCreateTime(DateUtil.getNow());
        stream.setStreamType("push");
        stream.setMediaServerId(mediaConfig.getId());
        int add = gbStreamMapper.add(stream);
        // 查找开启了全部直播流共享的上级平台
        List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
        if (parentPlatforms.size() > 0) {
            for (ParentPlatform parentPlatform : parentPlatforms) {
                stream.setCatalogId(parentPlatform.getCatalogId());
                stream.setPlatformId(parentPlatform.getServerGBId());
                String streamId = stream.getStream();
                StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
                if (streamProxyItem == null) {
                    platformGbStreamMapper.add(stream);
                    eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                }else {
                    if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
                        // 此流使用另一个国标Id已经与该平台关联,移除此记录
                        platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
                        platformGbStreamMapper.add(stream);
                        eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                    }
                }
            }
        }
        return add > 0;
    }
@@ -181,7 +176,6 @@
    @Override
    public StreamPushItem getPush(String app, String streamId) {
        return streamPushMapper.selectOne(app, streamId);
    }
@@ -215,7 +209,7 @@
        Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
        if (pushList.size() > 0) {
            for (StreamPushItem streamPushItem : pushList) {
                if (StringUtils.isEmpty(streamPushItem.getGbId())) {
                if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
                    pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
                }
            }
@@ -345,32 +339,8 @@
    public void batchAdd(List<StreamPushItem> streamPushItems) {
        streamPushMapper.addAll(streamPushItems);
        gbStreamMapper.batchAdd(streamPushItems);
        // 查找开启了全部直播流共享的上级平台
        List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
        if (parentPlatforms.size() > 0) {
            for (StreamPushItem stream : streamPushItems) {
                for (ParentPlatform parentPlatform : parentPlatforms) {
                    stream.setCatalogId(parentPlatform.getCatalogId());
                    stream.setPlatformId(parentPlatform.getServerGBId());
                    String streamId = stream.getStream();
                    StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
                    if (streamProxyItem == null) {
                        platformGbStreamMapper.add(stream);
                        eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                    }else {
                        if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
                            // 此流使用另一个国标Id已经与该平台关联,移除此记录
                            platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
                            platformGbStreamMapper.add(stream);
                            eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                            stream.setGbId(streamProxyItem.getGbId());
                            eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL);
                        }
                    }
                }
            }
        }
    }
    @Override
    public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
@@ -458,7 +428,6 @@
                            platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
                }
            }
        }
    }
@@ -481,4 +450,63 @@
        }
        return true;
    }
    @Override
    public void allStreamOffline() {
        List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
        if (onlinePushers.size() == 0) {
            return;
        }
        streamPushMapper.setAllStreamOffline();
        // 发送通知
        eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
    }
    @Override
    public void offline(List<StreamPushItemFromRedis> offlineStreams) {
        // 更新部分设备离线
        List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
        streamPushMapper.offline(offlineStreams);
        // 发送通知
        eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
    }
    @Override
    public void online(List<StreamPushItemFromRedis> onlineStreams) {
        // 更新部分设备上线streamPushService
        List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
        streamPushMapper.online(onlineStreams);
        // 发送通知
        eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
    }
    @Override
    public boolean add(StreamPushItem stream) {
        stream.setUpdateTime(DateUtil.getNow());
        stream.setCreateTime(DateUtil.getNow());
        stream.setServerId(userSetting.getServerId());
        // 放在事务内执行
        boolean result = false;
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            int addStreamResult = streamPushMapper.add(stream);
            if (!ObjectUtils.isEmpty(stream.getGbId())) {
                stream.setStreamType("push");
                gbStreamMapper.add(stream);
            }
            dataSourceTransactionManager.commit(transactionStatus);
            result = true;
        }catch (Exception e) {
            logger.error("批量移除流与平台的关系时错误", e);
            dataSourceTransactionManager.rollback(transactionStatus);
        }
        return result;
    }
    @Override
    public List<String> getAllAppAndStream() {
        return streamPushMapper.getAllAppAndStream();
    }
}