jiang
2022-07-13 fc89b7b517e191fb6c9b66fa86dd05ebab0f54c5
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
@@ -193,7 +194,9 @@
    @Override
    public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
        if (param ==null) return null;
        if (param ==null) {
            return null;
        }
        MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
        JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        return result;
@@ -227,9 +230,12 @@
    public boolean start(String app, String stream) {
        boolean result = false;
        StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
        if (!streamProxy.isEnable() &&  streamProxy != null) {
        if (!streamProxy.isEnable() ) {
            JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
            if (jsonObject == null) return false;
            if (jsonObject == null) {
                return false;
            }
            System.out.println(jsonObject);
            if (jsonObject.getInteger("code") == 0) {
                result = true;
                streamProxy.setEnable(true);
@@ -245,7 +251,7 @@
        StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
        if (streamProxyDto != null && streamProxyDto.isEnable()) {
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
            if (jsonObject.getInteger("code") == 0) {
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                streamProxyDto.setEnable(false);
                result = videoManagerStorager.updateStreamProxy(streamProxyDto);
            }
@@ -285,9 +291,12 @@
        }
        streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
        // 移除拉流代理生成的流信息
//        syncPullStream(mediaServerId);
        // 恢复流代理, 只查找这个这个流媒体
        List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
                mediaServerId, true, false);
                mediaServerId, true);
        for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
            logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
            JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
@@ -310,7 +319,7 @@
        }
        streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
        // 其他的流设置离线
        streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId);
        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
        String type = "PULL";
        // 发送redis消息
@@ -337,6 +346,47 @@
    @Override
    public int updateStatus(boolean status, String app, String stream) {
        return streamProxyMapper.updateStatus(status, app, stream);
        return streamProxyMapper.updateStatus(app, stream, status);
    }
    private void syncPullStream(String mediaServerId){
        MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
        if (mediaServer != null) {
            List<MediaItem> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
            if (allPullStream.size() > 0) {
                zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
                    Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
                    if (jsonObject.getInteger("code") == 0) {
                        JSONArray data = jsonObject.getJSONArray("data");
                        if(data != null && data.size() > 0) {
                            for (int i = 0; i < data.size(); i++) {
                                JSONObject streamJSONObj = data.getJSONObject(i);
                                if ("rtmp".equals(streamJSONObj.getString("schema"))) {
                                    StreamInfo streamInfo = new StreamInfo();
                                    String app = streamJSONObj.getString("app");
                                    String stream = streamJSONObj.getString("stream");
                                    streamInfo.setApp(app);
                                    streamInfo.setStream(stream);
                                    stringStreamInfoMap.put(app+stream, streamInfo);
                                }
                            }
                        }
                    }
                    if (stringStreamInfoMap.size() == 0) {
                        redisCatchStorage.removeStream(mediaServerId, "PULL");
                    }else {
                        for (String key : stringStreamInfoMap.keySet()) {
                            StreamInfo streamInfo = stringStreamInfoMap.get(key);
                            if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
                                redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
                                        streamInfo.getStream());
                            }
                        }
                    }
                });
            }
        }
    }
}