648540858
2022-03-15 ea08969bfcff38b8195282c77495eb1f8bd8eb07
使用zlm原生的rtp随机端口配置
6个文件已修改
224 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 186 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -372,12 +372,12 @@
                            }
                        }
                        if (playTransaction == null) {
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
                            String streamId = null;
                            if (mediaServerItem.isRtpEnable()) {
                                sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
                            }else {
                                sendRtpItem.setStreamId(ssrcInfo.getStream());
                                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                            }
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true);
                            sendRtpItem.setStreamId(ssrcInfo.getStream());
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
                            playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -10,8 +10,7 @@
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
@Component
public class ZLMRTPServerFactory {
@@ -23,54 +22,80 @@
    private int[] portRangeArray = new int[2];
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {
        Map<String, Integer> currentStreams = new HashMap<>();
    public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
        if (endPort <= startPort) return -1;
        if (usedFreelist == null) {
            usedFreelist = new ArrayList<>();
        }
        JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
        if (listRtpServerJsonResult != null) {
            JSONArray data = listRtpServerJsonResult.getJSONArray("data");
            if (data != null) {
                for (int i = 0; i < data.size(); i++) {
                    JSONObject dataItem = data.getJSONObject(i);
                    currentStreams.put(dataItem.getString("stream_id"), dataItem.getInteger("port"));
                    usedFreelist.add(dataItem.getInteger("port"));
                }
            }
        }
        // 已经在推流
        if (currentStreams.get(streamId) != null) {
            Map<String, Object> closeRtpServerParam = new HashMap<>();
            closeRtpServerParam.put("stream_id", streamId);
            zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
            currentStreams.remove(streamId);
        }
        Map<String, Object> param = new HashMap<>();
        int result = -1;
        // 不设置推流端口端则使用随机端口
        if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){
            int newPort = getPortFromportRange(mediaServerItem);
            param.put("port", newPort);
        // 设置推流端口
        if (startPort%2 == 1) {
            startPort ++;
        }
        boolean checkPort = false;
        for (int i = startPort; i < endPort  + 1; i+=2) {
            if (!usedFreelist.contains(i)){
                checkPort = true;
                startPort = i;
                break;
            }
        }
        if (!checkPort) {
            logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
            return -1;
        }
        param.put("port", startPort);
        String stream = UUID.randomUUID().toString();
        param.put("enable_tcp", 1);
        param.put("stream_id", streamId);
        param.put("stream_id", stream);
        param.put("port", 0);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        if (openRtpServerResultJson != null) {
            switch (openRtpServerResultJson.getInteger("code")){
                case 0:
                    result= openRtpServerResultJson.getInteger("port");
                    break;
                case -300: // id已经存在, 可能已经在其他端口推流
                    Map<String, Object> closeRtpServerParam = new HashMap<>();
                    closeRtpServerParam.put("stream_id", streamId);
                    zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
                    result = createRTPServer(mediaServerItem, streamId);;
                    break;
                case -400: // 端口占用
                    result= createRTPServer(mediaServerItem, streamId);
                    break;
                default:
                    logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"),  param.get("port"));
                    break;
            if (openRtpServerResultJson.getInteger("code") == 0) {
                result= openRtpServerResultJson.getInteger("port");
                Map<String, Object> closeRtpServerParam = new HashMap<>();
                closeRtpServerParam.put("stream_id", stream);
                zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
            }else {
                usedFreelist.add(startPort);
                startPort +=2;
                result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
            }
        }else {
            //  检查ZLM状态
            logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
        }
        return result;
    }
    public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {
        Map<String, Object> param = new HashMap<>();
        int result = -1;
        // 推流端口设置0则使用随机端口
        param.put("enable_tcp", 1);
        param.put("stream_id", streamId);
        param.put("port", 0);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        if (openRtpServerResultJson != null) {
            if (openRtpServerResultJson.getInteger("code") == 0) {
                result= openRtpServerResultJson.getInteger("port");
            }else {
                logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"),  param.get("port"));
            }
        }else {
            //  检查ZLM状态
@@ -99,32 +124,32 @@
        return result;
    }
    private int getPortFromportRange(MediaServerItem mediaServerItem) {
        int currentPort = mediaServerItem.getCurrentPort();
        if (currentPort == 0) {
            String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
            if (portRangeStrArray.length != 2) {
                portRangeArray[0] = 30000;
                portRangeArray[1] = 30500;
            }else {
                portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
                portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
            }
        }
        if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
            currentPort = portRangeArray[0];
            mediaServerItem.setCurrentPort(currentPort);
            return portRangeArray[0];
        } else {
            if (currentPort % 2 == 1) {
                currentPort++;
            }
            currentPort++;
            mediaServerItem.setCurrentPort(currentPort);
            return currentPort;
        }
    }
//    private int getPortFromportRange(MediaServerItem mediaServerItem) {
//        int currentPort = mediaServerItem.getCurrentPort();
//        if (currentPort == 0) {
//            String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
//            if (portRangeStrArray.length != 2) {
//                portRangeArray[0] = 30000;
//                portRangeArray[1] = 30500;
//            }else {
//                portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
//                portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
//            }
//        }
//
//        if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
//            currentPort = portRangeArray[0];
//            mediaServerItem.setCurrentPort(currentPort);
//            return portRangeArray[0];
//        } else {
//            if (currentPort % 2 == 1) {
//                currentPort++;
//            }
//            currentPort++;
//            mediaServerItem.setCurrentPort(currentPort);
//            return currentPort;
//        }
//    }
    /**
     * 创建一个国标推流
@@ -139,13 +164,18 @@
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
        // 使用RTPServer 功能找一个可用的端口
        String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();
        int localPort = createRTPServer(serverItem, playSsrc);
        if (localPort != -1) {
            // TODO 高并发时可能因为未放入缓存而ssrc冲突
            serverItem.getSsrcConfig().releaseSsrc(playSsrc);
            closeRTPServer(serverItem, playSsrc);
        String sendRtpPortRange = serverItem.getSendRtpPortRange();
        if (StringUtils.isEmpty(sendRtpPortRange)) {
            return null;
        }
        String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
        int localPort = -1;
        if (portRangeStrArray.length != 2) {
            localPort = getFreePort(serverItem, 30000, 30500, null);
        }else {
            localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]),  Integer.parseInt(portRangeStrArray[1]), null);
        }
        if (localPort == -1) {
            logger.error("没有可用的端口");
            return null;
        }
@@ -174,13 +204,19 @@
     * @return SendRtpItem
     */
    public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
        String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();
        int localPort = createRTPServer(serverItem, playSsrc);
        if (localPort != -1) {
            // TODO 高并发时可能因为未放入缓存而ssrc冲突
            serverItem.getSsrcConfig().releaseSsrc(ssrc);
            closeRTPServer(serverItem, playSsrc);
        // 使用RTPServer 功能找一个可用的端口
        String sendRtpPortRange = serverItem.getSendRtpPortRange();
        if (StringUtils.isEmpty(sendRtpPortRange)) {
            return null;
        }
        String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
        int localPort = -1;
        if (portRangeStrArray.length != 2) {
            localPort = getFreePort(serverItem, 30000, 30500, null);
        }else {
            localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]),  Integer.parseInt(portRangeStrArray[1]), null);
        }
        if (localPort == -1) {
            logger.error("没有可用的端口");
            return null;
        }
@@ -199,7 +235,7 @@
    }
    /**
     * 调用zlm RESTful API —— startSendRtp
     * 调用zlm RESTFUL API —— startSendRtp
     */
    public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
        Boolean result = false;
@@ -208,9 +244,9 @@
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            result= true;
            logger.info("RTP推流[ {}/{} ]请求成功,本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"));
            logger.info("RTP推流成功[ {}/{} ],本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"));
        } else {
            logger.error("RTP推流失败: " + jsonObject.getString("msg"));
            logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
        }
        return jsonObject;
    }
@@ -265,7 +301,7 @@
            result= true;
            logger.info("停止RTP推流成功");
        } else {
            logger.error("停止RTP推流失败: " + jsonObject.getString("msg"));
            logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
        }
        return result;
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java
@@ -194,6 +194,9 @@
    @JSONField(name = "rtp_proxy.port")
    private int rtpProxyPort;
    @JSONField(name = "rtp_proxy.port_range")
    private String portRange;
    @JSONField(name = "rtp_proxy.timeoutSec")
    private String rtpProxyTimeoutSec;
@@ -802,4 +805,12 @@
    public void setHookAliveInterval(int hookAliveInterval) {
        this.hookAliveInterval = hookAliveInterval;
    }
    public String getPortRange() {
        return portRange;
    }
    public void setPortRange(String portRange) {
        this.portRange = portRange;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java
@@ -91,7 +91,7 @@
        streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
        hookAliveInterval = zlmServerConfig.getHookAliveInterval();
        rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
        rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
        rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
        sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
        recordAssistPort = 0; // 默认关闭
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -521,6 +521,9 @@
        // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
        // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
        param.put("general.wait_track_ready_ms", "3000" );
        if (mediaServerItem.isRtpEnable() && !StringUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
            param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
        }
        JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -109,7 +109,6 @@
        // 录像查询以channelId作为deviceId查询
        String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
        resultHolder.put(key, uuid, result);
        Device device = storager.queryVideoDevice(deviceId);
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
        if (streamInfo == null) {
            RequestMessage msg = new RequestMessage();
@@ -120,15 +119,14 @@
            storager.stopPlay(deviceId, channelId);
            return result;
        }
        cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, (event) -> {
        cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, eventResult -> {
            redisCatchStorage.stopPlay(streamInfo);
            storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
            RequestMessage msg = new RequestMessage();
            msg.setId(uuid);
            msg.setKey(key);
            //Response response = event.getResponse();
            msg.setData(String.format("success"));
            resultHolder.invokeAllResult(msg);
            RequestMessage msgForSuccess = new RequestMessage();
            msgForSuccess.setId(uuid);
            msgForSuccess.setKey(key);
            msgForSuccess.setData(String.format("success"));
            resultHolder.invokeAllResult(msgForSuccess);
        });
        if (deviceId != null || channelId != null) {