648540858
2023-08-16 985082d33930868c3cc1723f28fd9aaae9013a8f
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
old mode 100644 new mode 100755
@@ -4,14 +4,13 @@
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -27,7 +26,7 @@
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
 * 视频代理业务
@@ -93,6 +93,9 @@
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;
    @Autowired
@@ -119,7 +122,7 @@
            }
            JSONArray dataArray = jsonObject.getJSONArray("data");
            JSONObject mediaServerConfig = dataArray.getJSONObject(0);
            String ffmpegCmd = mediaServerConfig.getString(param.getFfmpeg_cmd_key());
            String ffmpegCmd = mediaServerConfig.getString(param.getFfmpegCmdKey());
            String schema = getSchemaFromFFmpegCmd(ffmpegCmd);
            if (schema == null) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式");
@@ -130,9 +133,6 @@
                port = mediaInfo.getRtspPort();
                schemaForUri = schema;
            }else if (schema.equalsIgnoreCase("flv")) {
                port = mediaInfo.getHttpPort();
                schemaForUri = "http";
            }else if (schema.equalsIgnoreCase("rtmp")) {
                port = mediaInfo.getRtmpPort();
                schemaForUri = schema;
            }else {
@@ -146,7 +146,7 @@
            dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
                    param.getStream());
        }
        param.setDst_url(dstUrl);
        param.setDstUrl(dstUrl);
        logger.info("[拉流代理] 输出地址为:{}", dstUrl);
        param.setMediaServerId(mediaInfo.getId());
        boolean saveResult;
@@ -161,36 +161,49 @@
            return;
        }
        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                    mediaInfo, param.getApp(), param.getStream(), null, null);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
        });
        if (param.isEnable()) {
            String talkKey = UUID.randomUUID().toString();
            dynamicTask.startCron(talkKey, ()->{
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
                if (streamInfo != null) {
                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
                }
            }, 1000);
            String delayTalkKey = UUID.randomUUID().toString();
            dynamicTask.startDelay(delayTalkKey, ()->{
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
                if (streamInfo != null) {
                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
                }else {
                    dynamicTask.stop(talkKey);
                    callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
                }
            }, 5000);
            JSONObject jsonObject = addStreamProxyToZlm(param);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                dynamicTask.stop(talkKey);
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                        mediaInfo, param.getApp(), param.getStream(), null, null);
                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
            }else {
                param.setEnable(false);
                // 直接移除
                if (param.isEnable_remove_none_reader()) {
                if (param.isEnableRemoveNoneReader()) {
                    del(param.getApp(), param.getStream());
                }else {
                    updateStreamProxy(param);
                }
                if (jsonObject == null){
                    callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
                    return;
                }else {
                    callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
                    return;
                }
            }
        }
        else{
            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                    mediaInfo, param.getApp(), param.getStream(), null, null);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
        }
    }
@@ -298,13 +311,12 @@
            return null;
        }
        if ("default".equals(param.getType())){
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
                    param.isEnable_audio(), param.isEnable_mp4(), param.getRtp_type());
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
        }else if ("ffmpeg".equals(param.getType())) {
            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(),
                    param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(),
                    param.getFfmpeg_cmd_key());
            System.out.println(result);
            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                    param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
                    param.getFfmpegCmdKey());
        }
        return result;
    }
@@ -358,7 +370,7 @@
                updateStreamProxy(streamProxy);
            }else {
                logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
                        streamProxy.getSrc_url() == null? streamProxy.getUrl():streamProxy.getSrc_url());
                        streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl());
            }
        }
        return result;
@@ -404,7 +416,7 @@
    @Override
    public void zlmServerOnline(String mediaServerId) {
        // 移除开启了无人观看自动移除的流
        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
        if (streamProxyItemList.size() > 0) {
            gbStreamMapper.batchDel(streamProxyItemList);
        }
@@ -432,7 +444,7 @@
    @Override
    public void zlmServerOffline(String mediaServerId) {
        // 移除开启了无人观看自动移除的流
        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
        List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
        if (streamProxyItemList.size() > 0) {
            gbStreamMapper.batchDel(streamProxyItemList);
        }
@@ -510,7 +522,11 @@
    }
    @Override
    public ResourceBaceInfo getOverview() {
        return streamProxyMapper.getOverview();
    public ResourceBaseInfo getOverview() {
        int total = streamProxyMapper.getAllCount();
        int online = streamProxyMapper.getOnline();
        return new ResourceBaseInfo(total, online);
    }
}