648540858
2023-06-27 5a7a7a12bde268c104610d3e81a08df06ccab5eb
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -3,8 +3,14 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -92,7 +98,17 @@
        return result;
    }
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean onlyAuto) {
    /**
     * 开启rtpServer
     * @param mediaServerItem zlm服务实例
     * @param streamId 流Id
     * @param ssrc ssrc
     * @param port 端口, 0/null为使用随机
     * @param reUsePort 是否重用端口
     * @param tcpMode 0/null udp 模式,1 tcp 被动模式, 2 tcp 主动模式。
     * @return
     */
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        int result = -1;
        // 查询此rtp server 是否已经存在
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
@@ -108,7 +124,7 @@
                    JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param);
                    if (jsonObject != null ) {
                        if (jsonObject.getInteger("code") == 0) {
                            return createRTPServer(mediaServerItem, streamId, ssrc, port, onlyAuto);
                            return createRTPServer(mediaServerItem, streamId, ssrc, port,onlyAuto, reUsePort, tcpMode);
                        }else {
                            logger.warn("[开启rtpServer], 重启RtpServer错误");
                        }
@@ -122,8 +138,14 @@
        Map<String, Object> param = new HashMap<>();
        param.put("enable_tcp", 1);
        if (tcpMode == null) {
            tcpMode = 0;
        }
        param.put("tcp_mode", tcpMode);
        param.put("stream_id", streamId);
        if (reUsePort != null) {
            param.put("re_use_port", reUsePort?"1":"0");
        }
        // 推流端口设置0则使用随机端口
        if (port == null) {
            param.put("port", 0);
@@ -169,6 +191,31 @@
        return result;
    }
    public void closeRtpServer(MediaServerItem serverItem, String streamId, CommonCallback<Boolean> callback) {
        if (serverItem == null) {
            callback.run(false);
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("stream_id", streamId);
        zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> {
            if (jsonObject != null ) {
                if (jsonObject.getInteger("code") == 0) {
                    callback.run(jsonObject.getInteger("hit") == 1);
                    return;
                }else {
                    logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
                }
            }else {
                //  检查ZLM状态
                logger.error("关闭RTP Server 失败: 请检查ZLM服务");
            }
            callback.run(false);
        });
    }
    /**
     * 创建一个国标推流
@@ -186,7 +233,7 @@
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            if (userSetting.getGbSendStreamStrict()) {
                localPort = keepPort(serverItem, ssrc);
                localPort = keepPort(serverItem, ssrc, localPort);
                if (localPort == 0) {
                    return null;
                }
@@ -222,7 +269,7 @@
        // 默认为随机端口
        int localPort = 0;
        if (userSetting.getGbSendStreamStrict()) {
            localPort = keepPort(serverItem, ssrc);
            localPort = keepPort(serverItem, ssrc, localPort);
            if (localPort == 0) {
                return null;
            }
@@ -246,28 +293,35 @@
    /**
     * 保持端口,直到需要需要发流时再释放
     */
    public int keepPort(MediaServerItem serverItem, String ssrc) {
        int localPort = 0;
    public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) {
        Map<String, Object> param = new HashMap<>(3);
        param.put("port", 0);
        param.put("port", localPort);
        param.put("enable_tcp", 1);
        param.put("stream_id", ssrc);
        JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
        if (jsonObject.getInteger("code") == 0) {
            localPort = jsonObject.getInteger("port");
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            Integer finalLocalPort = localPort;
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (MediaServerItem mediaServerItem, JSONObject response)->{
                        logger.info("[保持端口] {}->监听端口到期继续保持监听", ssrc);
                        keepPort(serverItem, ssrc);
                    (MediaServerItem mediaServerItem, HookParam hookParam)->{
                        logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
                        OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
                        if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
                            return;
                        }
                        int port = keepPort(serverItem, ssrc, finalLocalPort);
                        if (port == 0) {
                            logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                        }
                    });
        logger.info("[保持端口] {}->监听端口: {}", ssrc, localPort);
            logger.info("[保持端口] {}->监听端口: {}", ssrc, localPort);
            logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
            return localPort;
        }else {
            logger.info("[保持端口] 监听端口失败: {}", ssrc);
            logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
            return 0;
        }
        return localPort;
    }
    /**
@@ -305,6 +359,9 @@
     */
    public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
        if (mediaInfo.getInteger("code") == -2) {
            return null;
        }
        return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
    }
@@ -313,8 +370,10 @@
     */
    public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
        return mediaInfo != null && (mediaInfo.getInteger("code") == 0
        if (mediaInfo == null || (mediaInfo.getInteger("code") == -2)) {
            return null;
        }
        return  (mediaInfo.getInteger("code") == 0
                && mediaInfo.getJSONArray("data") != null
                && mediaInfo.getJSONArray("data").size() > 0);
    }
@@ -406,4 +465,19 @@
        }
        return startSendRtpStreamResult;
    }
    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
        boolean result = false;
        JSONObject jsonObject = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
        if (jsonObject == null) {
            logger.error("[更新RTPServer] 失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            result= true;
            logger.info("[更新RTPServer] 成功");
        } else {
            logger.error("[更新RTPServer] 失败: {}, streamId:{},ssrc:{}->\r\n{}",jsonObject.getString("msg"),
                    streamId, ssrc, jsonObject);
        }
        return result;
    }
}