648540858
2022-01-14 ac1a4a027a7bd88efb32e9da666bdba4b5fa166f
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -5,24 +5,28 @@
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.OriginType;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -36,7 +40,16 @@
    private StreamPushMapper streamPushMapper;
    @Autowired
    private ParentPlatformMapper parentPlatformMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IGbStreamService gbStreamService;
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
@@ -70,7 +83,6 @@
                    result.put(key, streamPushItem);
                }
            }
        }
        return new ArrayList<>(result.values());
@@ -113,12 +125,38 @@
        stream.setStreamType("push");
        stream.setStatus(true);
        int add = gbStreamMapper.add(stream);
        // 查找开启了全部直播流共享的上级平台
        List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
        if (parentPlatforms.size() > 0) {
            for (ParentPlatform parentPlatform : parentPlatforms) {
                stream.setCatalogId(parentPlatform.getCatalogId());
                stream.setPlatformId(parentPlatform.getServerGBId());
                String streamId = stream.getStream();
                StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
                if (streamProxyItem == null) {
                    platformGbStreamMapper.add(stream);
                    eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                }else {
                    if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
                        // 此流使用另一个国标Id已经与该平台关联,移除此记录
                        platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
                        platformGbStreamMapper.add(stream);
                        eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
                    }
                }
            }
        }
        return add > 0;
    }
    @Override
    public boolean removeFromGB(GbStream stream) {
        // 判断是否需要发送事件
        gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
        int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
        platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
        MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
        if (mediaList == null) {
@@ -137,6 +175,8 @@
    @Override
    public boolean stop(String app, String streamId) {
        StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
        gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
        int delStream = streamPushMapper.del(app, streamId);
        gbStreamMapper.del(app, streamId);
        platformGbStreamMapper.delByAppAndStream(app, streamId);
@@ -154,47 +194,74 @@
        if (mediaServerItem == null) {
            return;
        }
        // 数据库记录
        List<StreamPushItem> pushList = getPushList(mediaServerId);
        Map<String, StreamPushItem> pushItemMap = new HashMap<>();
        // redis记录
        List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
        Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
        if (pushList.size() > 0) {
            Map<String, StreamPushItem> pushItemMap = new HashMap<>();
            for (StreamPushItem streamPushItem : pushList) {
                pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
            }
            zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
                if (mediaList == null) return;
                String dataStr = mediaList.getString("data");
                Integer code = mediaList.getInteger("code");
                List<StreamPushItem> streamPushItems = null;
                if (code == 0 ) {
                    if (dataStr != null) {
                        streamPushItems = handleJSON(dataStr, mediaServerItem);
                    }
                }
                if (streamPushItems != null) {
                    for (StreamPushItem streamPushItem : streamPushItems) {
                        pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    }
                }
                Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
                if (offlinePushItems.size() > 0) {
                    String type = "PUSH";
                    streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
                    for (StreamPushItem offlinePushItem : offlinePushItems) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("serverId", userSetup.getServerId());
                        jsonObject.put("app", offlinePushItem.getApp());
                        jsonObject.put("stream", offlinePushItem.getStream());
                        jsonObject.put("register", false);
                        jsonObject.put("mediaServerId", mediaServerId);
                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                        // 移除redis内流的信息
                        redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
                    }
                }
            }));
        }
        if (mediaItems.size() > 0) {
            for (MediaItem mediaItem : mediaItems) {
                streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
            }
        }
        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
            if (mediaList == null) return;
            String dataStr = mediaList.getString("data");
            Integer code = mediaList.getInteger("code");
            List<StreamPushItem> streamPushItems = null;
            if (code == 0 ) {
                if (dataStr != null) {
                    streamPushItems = handleJSON(dataStr, mediaServerItem);
                }
            }
            if (streamPushItems != null) {
                for (StreamPushItem streamPushItem : streamPushItems) {
                    pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                }
            }
            List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
            if (offlinePushItems.size() > 0) {
                String type = "PUSH";
                int runLimit = 300;
                if (offlinePushItems.size() > runLimit) {
                    for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
                        int toIndex = i + runLimit;
                        if (i + runLimit > offlinePushItems.size()) {
                            toIndex = offlinePushItems.size();
                        }
                        List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
                        streamPushMapper.delAll(streamPushItemsSub);
                    }
                }else {
                    streamPushMapper.delAll(offlinePushItems);
                }
            }
            Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
            if (offlineMediaItemList.size() > 0) {
                String type = "PUSH";
                for (MediaItem offlineMediaItem : offlineMediaItemList) {
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("serverId", userSetup.getServerId());
                    jsonObject.put("app", offlineMediaItem.getApp());
                    jsonObject.put("stream", offlineMediaItem.getStream());
                    jsonObject.put("register", false);
                    jsonObject.put("mediaServerId", mediaServerId);
                    redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                    // 移除redis内流的信息
                    redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
                }
            }
        }));
    }
    @Override
@@ -208,18 +275,18 @@
        // 发送流停止消息
        String type = "PUSH";
        // 发送redis消息
        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (streamInfoList.size() > 0) {
            for (StreamInfo streamInfo : streamInfoList) {
            for (MediaItem mediaItem : streamInfoList) {
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetup.getServerId());
                jsonObject.put("app", streamInfo.getApp());
                jsonObject.put("stream", streamInfo.getStreamId());
                jsonObject.put("app", mediaItem.getApp());
                jsonObject.put("stream", mediaItem.getStream());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
            }
        }
    }
@@ -228,4 +295,30 @@
    public void clean() {
    }
    @Override
    public boolean saveToRandomGB() {
        List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
        long gbId = 100001;
        for (StreamPushItem streamPushItem : streamPushItems) {
            streamPushItem.setStreamType("push");
            streamPushItem.setStatus(true);
            streamPushItem.setGbId("34020000004111" + gbId);
            gbId ++;
        }
        int  limitCount = 30;
        if (streamPushItems.size() > limitCount) {
            for (int i = 0; i < streamPushItems.size(); i += limitCount) {
                int toIndex = i + limitCount;
                if (i + limitCount > streamPushItems.size()) {
                    toIndex = streamPushItems.size();
                }
                gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
            }
        }else {
            gbStreamMapper.batchAdd(streamPushItems);
        }
        return true;
    }
}