648540858
2022-05-06 5d901b5e3f033e8b04e53420d68626cbd87431c8
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -10,8 +10,7 @@
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
@Component
public class ZLMRTPServerFactory {
@@ -23,58 +22,88 @@
    private int[] portRangeArray = new int[2];
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {
        Map<String, Integer> currentStreams = new HashMap<>();
    public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
        if (endPort <= startPort) {
            return -1;
        }
        if (usedFreelist == null) {
            usedFreelist = new ArrayList<>();
        }
        JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
        if (listRtpServerJsonResult != null) {
            JSONArray data = listRtpServerJsonResult.getJSONArray("data");
            if (data != null) {
                for (int i = 0; i < data.size(); i++) {
                    JSONObject dataItem = data.getJSONObject(i);
                    currentStreams.put(dataItem.getString("stream_id"), dataItem.getInteger("port"));
                    usedFreelist.add(dataItem.getInteger("port"));
                }
            }
        }
        // 已经在推流
        if (currentStreams.get(streamId) != null) {
            Map<String, Object> closeRtpServerParam = new HashMap<>();
            closeRtpServerParam.put("stream_id", streamId);
            zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
            currentStreams.remove(streamId);
        }
        Map<String, Object> param = new HashMap<>();
        int result = -1;
        /**
         * 不设置推流端口端则使用随机端口
         */
        if (StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){
            param.put("port", 0);
        }else {
            int newPort = getPortFromportRange(mediaServerItem);
            param.put("port", newPort);
        // 设置推流端口
        if (startPort%2 == 1) {
            startPort ++;
        }
        boolean checkPort = false;
        for (int i = startPort; i < endPort  + 1; i+=2) {
            if (!usedFreelist.contains(i)){
                checkPort = true;
                startPort = i;
                break;
            }
        }
        if (!checkPort) {
            logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
            return -1;
        }
        param.put("port", startPort);
        String stream = UUID.randomUUID().toString();
        param.put("enable_tcp", 1);
        param.put("stream_id", streamId);
        param.put("stream_id", stream);
        param.put("port", 0);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        if (openRtpServerResultJson != null) {
            switch (openRtpServerResultJson.getInteger("code")){
                case 0:
                    result= openRtpServerResultJson.getInteger("port");
                    break;
                case -300: // id已经存在, 可能已经在其他端口推流
                    Map<String, Object> closeRtpServerParam = new HashMap<>();
                    closeRtpServerParam.put("stream_id", streamId);
                    zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
                    result = createRTPServer(mediaServerItem, streamId);;
                    break;
                case -400: // 端口占用
                    result= createRTPServer(mediaServerItem, streamId);
                    break;
                default:
                    logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"),  param.get("port"));
                    break;
            if (openRtpServerResultJson.getInteger("code") == 0) {
                result= openRtpServerResultJson.getInteger("port");
                Map<String, Object> closeRtpServerParam = new HashMap<>();
                closeRtpServerParam.put("stream_id", stream);
                zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
            }else {
                usedFreelist.add(startPort);
                startPort +=2;
                result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
            }
        }else {
            //  检查ZLM状态
            logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
        }
        return result;
    }
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc) {
        int result = -1;
        // 查询此rtp server 是否已经存在
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
        if (rtpInfo != null && rtpInfo.getInteger("code") == 0 && rtpInfo.getBoolean("exist")) {
            result = rtpInfo.getInteger("local_port");
            return result;
        }
        Map<String, Object> param = new HashMap<>();
        // 推流端口设置0则使用随机端口
        param.put("enable_tcp", 1);
        param.put("stream_id", streamId);
        param.put("port", 0);
        param.put("ssrc", ssrc);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        if (openRtpServerResultJson != null) {
            if (openRtpServerResultJson.getInteger("code") == 0) {
                result= openRtpServerResultJson.getInteger("port");
            }else {
                logger.error("创建RTP Server 失败 {}: ", openRtpServerResultJson.getString("msg"));
            }
        }else {
            //  检查ZLM状态
@@ -103,32 +132,32 @@
        return result;
    }
    private int getPortFromportRange(MediaServerItem mediaServerItem) {
        int currentPort = mediaServerItem.getCurrentPort();
        if (currentPort == 0) {
            String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
            if (portRangeStrArray.length != 2) {
                portRangeArray[0] = 30000;
                portRangeArray[1] = 30500;
            }else {
                portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
                portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
            }
        }
        if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
            currentPort = portRangeArray[0];
            mediaServerItem.setCurrentPort(currentPort);
            return portRangeArray[0];
        } else {
            if (currentPort % 2 == 1) {
                currentPort++;
            }
            currentPort++;
            mediaServerItem.setCurrentPort(currentPort);
            return currentPort;
        }
    }
//    private int getPortFromportRange(MediaServerItem mediaServerItem) {
//        int currentPort = mediaServerItem.getCurrentPort();
//        if (currentPort == 0) {
//            String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
//            if (portRangeStrArray.length != 2) {
//                portRangeArray[0] = 30000;
//                portRangeArray[1] = 30500;
//            }else {
//                portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
//                portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
//            }
//        }
//
//        if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
//            currentPort = portRangeArray[0];
//            mediaServerItem.setCurrentPort(currentPort);
//            return portRangeArray[0];
//        } else {
//            if (currentPort % 2 == 1) {
//                currentPort++;
//            }
//            currentPort++;
//            mediaServerItem.setCurrentPort(currentPort);
//            return currentPort;
//        }
//    }
    /**
     * 创建一个国标推流
@@ -143,13 +172,18 @@
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
        // 使用RTPServer 功能找一个可用的端口
        String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();
        int localPort = createRTPServer(serverItem, playSsrc);
        if (localPort != -1) {
            // TODO 高并发时可能因为未放入缓存而ssrc冲突
            serverItem.getSsrcConfig().releaseSsrc(playSsrc);
            closeRTPServer(serverItem, playSsrc);
        String sendRtpPortRange = serverItem.getSendRtpPortRange();
        if (StringUtils.isEmpty(sendRtpPortRange)) {
            return null;
        }
        String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
        int localPort = -1;
        if (portRangeStrArray.length != 2) {
            localPort = getFreePort(serverItem, 30000, 30500, null);
        }else {
            localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]),  Integer.parseInt(portRangeStrArray[1]), null);
        }
        if (localPort == -1) {
            logger.error("没有可用的端口");
            return null;
        }
@@ -178,13 +212,19 @@
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
        String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();
        int localPort = createRTPServer(serverItem, playSsrc);
        if (localPort != -1) {
            // TODO 高并发时可能因为未放入缓存而ssrc冲突
            serverItem.getSsrcConfig().releaseSsrc(ssrc);
            closeRTPServer(serverItem, playSsrc);
        // 使用RTPServer 功能找一个可用的端口
        String sendRtpPortRange = serverItem.getSendRtpPortRange();
        if (StringUtils.isEmpty(sendRtpPortRange)) {
            return null;
        }
        String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
        int localPort = -1;
        if (portRangeStrArray.length != 2) {
            localPort = getFreePort(serverItem, 30000, 30500, null);
        }else {
            localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]),  Integer.parseInt(portRangeStrArray[1]), null);
        }
        if (localPort == -1) {
            logger.error("没有可用的端口");
            return null;
        }
@@ -203,20 +243,20 @@
    }
    /**
     * 调用zlm RESTful API —— startSendRtp
     * 调用zlm RESTFUL API —— startSendRtp
     */
    public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
    public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
        Boolean result = false;
        JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param);
        if (jsonObject == null) {
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            result= true;
            logger.info("RTP推流[ {}/{} ]请求成功,本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"));
            logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
        } else {
            logger.error("RTP推流失败: " + jsonObject.getString("msg"));
            logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
        }
        return result;
        return jsonObject;
    }
    /**
@@ -242,8 +282,17 @@
     */
    public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) {
        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
        Integer code = mediaInfo.getInteger("code");
        if (mediaInfo == null) {
            return 0;
        }
        if ( code < 0) {
            logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
            return -1;
        }
        if ( code == 0 && ! mediaInfo.getBoolean("online")) {
            logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
            return -1;
        }
        return mediaInfo.getInteger("totalReaderCount");
    }
@@ -260,7 +309,7 @@
            result= true;
            logger.info("停止RTP推流成功");
        } else {
            logger.error("停止RTP推流失败: " + jsonObject.getString("msg"));
            logger.error("停止RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
        }
        return result;
    }