648540858
2023-06-27 5a7a7a12bde268c104610d3e81a08df06ccab5eb
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -11,6 +11,9 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -105,7 +108,7 @@
     * @param tcpMode 0/null udp 模式,1 tcp 被动模式, 2 tcp 主动模式。
     * @return
     */
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean reUsePort, Integer tcpMode) {
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        int result = -1;
        // 查询此rtp server 是否已经存在
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
@@ -121,7 +124,7 @@
                    JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param);
                    if (jsonObject != null ) {
                        if (jsonObject.getInteger("code") == 0) {
                            return createRTPServer(mediaServerItem, streamId, ssrc, port, reUsePort, tcpMode);
                            return createRTPServer(mediaServerItem, streamId, ssrc, port,onlyAuto, reUsePort, tcpMode);
                        }else {
                            logger.warn("[开启rtpServer], 重启RtpServer错误");
                        }
@@ -150,6 +153,9 @@
            param.put("port", port);
        }
        param.put("ssrc", ssrc);
        if (onlyAuto != null) {
            param.put("only_audio", onlyAuto?"1":"0");
        }
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        logger.info(JSONObject.toJSONString(openRtpServerResultJson));
        if (openRtpServerResultJson != null) {
@@ -273,7 +279,7 @@
        sendRtpItem.setPort(port);
        sendRtpItem.setSsrc(ssrc);
        sendRtpItem.setApp(app);
        sendRtpItem.setStreamId(stream);
        sendRtpItem.setStream(stream);
        sendRtpItem.setPlatformId(platformId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(tcp);
@@ -322,7 +328,7 @@
     * 释放保持的端口
     */
    public boolean releasePort(MediaServerItem serverItem, String ssrc) {
        logger.info("[上级点播] {}->释放监听端口", ssrc);
        logger.info("[保持端口] {}->释放监听端口", ssrc);
        boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
        HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
        // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
@@ -335,6 +341,17 @@
     */
    public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
        return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
    }
    /**
     * 调用zlm RESTFUL API —— startSendRtpPassive
     */
    public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object>param) {
        return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param);
    }
    public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object>param, ZLMRESTfulUtils.RequestCallback callback) {
        return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
    }
    /**
@@ -372,11 +389,11 @@
            return 0;
        }
        Integer code = mediaInfo.getInteger("code");
        if ( code < 0) {
        if (code < 0) {
            logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
            return -1;
        }
        if ( code == 0 && mediaInfo.getBoolean("online") != null && !mediaInfo.getBoolean("online")) {
        if ( code == 0 && mediaInfo.getBoolean("online") != null && ! mediaInfo.getBoolean("online")) {
            logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
            return -1;
        }
@@ -395,13 +412,58 @@
            result= true;
            logger.info("[停止RTP推流] 成功");
        } else {
            logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
            logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
        }
        return result;
    }
    public void closeAllSendRtpStream() {
    public JSONObject startSendRtp(MediaServerItem mediaInfo, SendRtpItem sendRtpItem) {
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        logger.info("rtp/{}开始推流, 目标={}:{},SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
        }
        if (mediaInfo == null) {
            return null;
        }
        // 如果是非严格模式,需要关闭端口占用
        JSONObject startSendRtpStreamResult = null;
        if (sendRtpItem.getLocalPort() != 0) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
            if (releasePort(mediaInfo, sendRtpItem.getSsrc())) {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param);
                    System.out.println(JSON.toJSON(param));
                }else {
                    param.put("is_udp", is_Udp);
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = startSendRtpStream(mediaInfo, param);
                }
            }
        }else {
            if (sendRtpItem.isTcpActive()) {
                startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param);
            }else {
                param.put("is_udp", is_Udp);
                param.put("dst_url", sendRtpItem.getIp());
                param.put("dst_port", sendRtpItem.getPort());
                startSendRtpStreamResult = startSendRtpStream(mediaInfo, param);
            }
        }
        return startSendRtpStreamResult;
    }
    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {