648540858
2023-09-30 130dc5d82da0e89241eefd4ea91ba7d861de866d
优化拉流代理逻辑,修复ffmpeg拉流代理鉴权
9个文件已修改
117 ■■■■ 已修改文件
sql/2.6.9更新.sql 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/初始化.sql 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/2.6.9¸üÐÂ.sql
@@ -3,3 +3,6 @@
alter table wvp_platform
    add auto_push_channel bool default false
alter table wvp_stream_proxy
    add stream_key varying(255)
sql/³õʼ»¯.sql
@@ -244,6 +244,7 @@
                                  create_time character varying(50),
                                  name character varying(255),
                                  update_time character varying(50),
                                  stream_key character varying(255),
                                  enable_disable_none_reader bool default false,
                                  constraint uk_stream_proxy_app_stream unique (app, stream)
);
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -93,7 +93,10 @@
                    }
                    if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
                        for (GbStream gbStream : event.getGbStreams()) {
                            if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
                            if (gbStream != null
                                    && gbStream.getStreamType() != null
                                    && gbStream.getStreamType().equals("push")
                                    && !userSetting.isUsePushingAsStatus()) {
                                continue;
                            }
                            DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -199,6 +199,13 @@
        }
        // æŽ¨æµé‰´æƒçš„处理
        if (!"rtp".equals(param.getApp())) {
            StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
            if (stream != null) {
                HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
                result.setEnable_audio(stream.isEnableAudio());
                result.setEnable_mp4(stream.isEnableMp4());
                return result;
            }
            if (userSetting.getPushAuthority()) {
                // æŽ¨æµé‰´æƒ
                if (param.getParams() == null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -272,6 +272,12 @@
        return sendPost(mediaServerItem, "delFFmpegSource",param, null);
    }
    public JSONObject delStreamProxy(MediaServerItem mediaServerItem, String key){
        Map<String, Object> param = new HashMap<>();
        param.put("key", key);
        return sendPost(mediaServerItem, "delStreamProxy",param, null);
    }
    public JSONObject getMediaServerConfig(MediaServerItem mediaServerItem){
        return sendPost(mediaServerItem, "getServerConfig",null, null);
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
@@ -41,6 +41,9 @@
    @Schema(description = "是否 æ— äººè§‚看时自动停用")
    private boolean enableDisableNoneReader;
    @Schema(description = "拉流代理时zlm返回的key,用于停止拉流代理")
    private String streamKey;
    public String getType() {
        return type;
    }
@@ -167,5 +170,11 @@
        this.enableAudio = enable_audio;
    }
    public String getStreamKey() {
        return streamKey;
    }
    public void setStreamKey(String streamKey) {
        this.streamKey = streamKey;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@@ -60,6 +61,9 @@
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private StreamProxyMapper streamProxyMapper;
@@ -145,7 +149,7 @@
            dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
                    param.getStream());
        }else {
            dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
                    param.getStream());
        }
        param.setDstUrl(dstUrl);
@@ -170,12 +174,6 @@
        });
        if (param.isEnable()) {
            String talkKey = UUID.randomUUID().toString();
//            dynamicTask.startCron(talkKey, ()->{
//                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
//                if (streamInfo != null) {
//                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
//                }
//            }, 3000);
            String delayTalkKey = UUID.randomUUID().toString();
            dynamicTask.startDelay(delayTalkKey, ()->{
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -318,13 +316,32 @@
        if (mediaServerItem == null) {
            return null;
        }
        if ("default".equals(param.getType())){
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
        }else if ("ffmpeg".equals(param.getType())) {
        if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
            zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        }
        if ("ffmpeg".equalsIgnoreCase(param.getType())){
            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                    param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
                    param.getFfmpegCmdKey());
        }else {
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
        }
        System.out.println("addStreamProxyToZlm====");
        System.out.println(result);
        if (result != null && result.getInteger("code") == 0) {
            JSONObject data = result.getJSONObject("data");
            if (data == null) {
                logger.warn("[获取拉流代理的结果数据Data] å¤±è´¥ï¼š {}", result );
                return result;
            }
            String key = data.getString("key");
            if (key == null) {
                logger.warn("[获取拉流代理的结果数据Data中的KEY] å¤±è´¥ï¼š {}", result );
                return result;
            }
            param.setStreamKey(key);
            streamProxyMapper.update(param);
        }
        return result;
    }
@@ -335,7 +352,12 @@
            return null;
        }
        MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
        JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        JSONObject result = null;
        if ("ffmpeg".equalsIgnoreCase(param.getType())){
            result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
        }else {
            result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
        }
        return result;
    }
@@ -350,19 +372,18 @@
        if (streamProxyItem != null) {
            gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                // å¦‚果关联了国标那么移除关联
                int i = platformGbStreamMapper.delByAppAndStream(app, stream);
                gbStreamMapper.del(app, stream);
                System.out.println();
                // TODO å¦‚果关联的推流, é‚£ä¹ˆçŠ¶æ€è®¾ç½®ä¸ºç¦»çº¿
            }
            // å¦‚果关联了国标那么移除关联
            platformGbStreamMapper.delByAppAndStream(app, stream);
            gbStreamMapper.del(app, stream);
            videoManagerStorager.deleteStreamProxy(app, stream);
            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                logger.info("[移除代理]: ä»£ç†ï¼š {}/{}, ä»Žzlm移除成功", app, stream);
            }else {
                logger.info("[移除代理]: ä»£ç†ï¼š {}/{}, ä»Žzlm移除失败", app, stream);
            }
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -12,9 +12,9 @@
public interface StreamProxyMapper {
    @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
            "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
            "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
            "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
            "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " +
            "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
            "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
    int add(StreamProxyItem streamProxyDto);
@@ -33,6 +33,7 @@
            "enable_audio=#{enableAudio}, " +
            "enable=#{enable}, " +
            "status=#{status}, " +
            "stream_key=#{streamKey}, " +
            "enable_remove_none_reader=#{enableRemoveNoneReader}, " +
            "enable_disable_none_reader=#{enableDisableNoneReader}, " +
            "enable_mp4=#{enableMp4} " +
@@ -45,7 +46,7 @@
    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
    List<StreamProxyItem> selectAll();
    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
    List<StreamProxyItem> selectForEnable(boolean enable);
    @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -67,6 +67,16 @@
        return streamProxyService.getAll(page, count);
    }
    @Operation(summary = "查询流代理")
    @Parameter(name = "app", description = "应用名")
    @Parameter(name = "stream", description = "流Id")
    @GetMapping(value = "/one")
    @ResponseBody
    public StreamProxyItem one(String app, String stream){
        return streamProxyService.getStreamProxyByAppAndStream(app, stream);
    }
    @Operation(summary = "保存代理", parameters = {
            @Parameter(name = "param", description = "代理参数", required = true),
    })
@@ -86,6 +96,10 @@
        if (ObjectUtils.isEmpty(param.getGbId())) {
            param.setGbId(null);
        }
        StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
        if (streamProxyItem  != null) {
            streamProxyService.del(param.getApp(), param.getStream());
        }
        RequestMessage requestMessage = new RequestMessage();
        String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();