winfed
2023-06-09 a2d93fce811acc83ad5ff0b4a93403db22795a10
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -3,9 +3,12 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -90,7 +93,17 @@
        return result;
    }
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port) {
    /**
     * 开启rtpServer
     * @param mediaServerItem zlm服务实例
     * @param streamId 流Id
     * @param ssrc ssrc
     * @param port 端口, 0/null为使用随机
     * @param reUsePort 是否重用端口
     * @param tcpMode 0/null udp 模式,1 tcp 被动模式, 2 tcp 主动模式。
     * @return
     */
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean reUsePort, Integer tcpMode) {
        int result = -1;
        // 查询此rtp server 是否已经存在
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
@@ -106,7 +119,7 @@
                    JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param);
                    if (jsonObject != null ) {
                        if (jsonObject.getInteger("code") == 0) {
                            return createRTPServer(mediaServerItem, streamId, ssrc, port);
                            return createRTPServer(mediaServerItem, streamId, ssrc, port, reUsePort, tcpMode);
                        }else {
                            logger.warn("[开启rtpServer], 重启RtpServer错误");
                        }
@@ -120,8 +133,14 @@
        Map<String, Object> param = new HashMap<>();
        param.put("enable_tcp", 1);
        if (tcpMode == null) {
            tcpMode = 0;
        }
        param.put("tcp_mode", tcpMode);
        param.put("stream_id", streamId);
        if (reUsePort != null) {
            param.put("re_use_port", reUsePort?"1":"0");
        }
        // 推流端口设置0则使用随机端口
        if (port == null) {
            param.put("port", 0);
@@ -162,6 +181,31 @@
            }
        }
        return result;
    }
    public void closeRtpServer(MediaServerItem serverItem, String streamId, CommonCallback<Boolean> callback) {
        if (serverItem == null) {
            callback.run(false);
            return;
        }
        Map<String, Object> param = new HashMap<>();
        param.put("stream_id", streamId);
        zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> {
            if (jsonObject != null ) {
                if (jsonObject.getInteger("code") == 0) {
                    callback.run(jsonObject.getInteger("hit") == 1);
                    return;
                }else {
                    logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
                }
            }else {
                //  检查ZLM状态
                logger.error("关闭RTP Server 失败: 请检查ZLM服务");
            }
            callback.run(false);
        });
    }
@@ -251,11 +295,14 @@
        if (jsonObject.getInteger("code") == 0) {
            localPort = jsonObject.getInteger("port");
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (MediaServerItem mediaServerItem, JSONObject response)->{
                        logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
                        keepPort(serverItem, ssrc);
                        int port = keepPort(serverItem, ssrc);
                        if (port == 0) {
                            logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                        }
                    });
            logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
        }else {
@@ -288,6 +335,9 @@
     */
    public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
        if (mediaInfo.getInteger("code") == -2) {
            return null;
        }
        return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
    }
@@ -296,8 +346,10 @@
     */
    public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
        return mediaInfo != null && (mediaInfo.getInteger("code") == 0
        if (mediaInfo == null || (mediaInfo.getInteger("code") == -2)) {
            return null;
        }
        return  (mediaInfo.getInteger("code") == 0
                && mediaInfo.getJSONArray("data") != null
                && mediaInfo.getJSONArray("data").size() > 0);
    }
@@ -344,4 +396,19 @@
    public void closeAllSendRtpStream() {
    }
    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
        boolean result = false;
        JSONObject jsonObject = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
        if (jsonObject == null) {
            logger.error("[更新RTPServer] 失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            result= true;
            logger.info("[更新RTPServer] 成功");
        } else {
            logger.error("[更新RTPServer] 失败: {}, streamId:{},ssrc:{}->\r\n{}",jsonObject.getString("msg"),
                    streamId, ssrc, jsonObject);
        }
        return result;
    }
}