648540858
2023-08-08 6106bda1510518b083a970b7c642342c55933273
Merge branch '2.6.8' into wvp-28181-2.0
2个文件已修改
73 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java
@@ -30,7 +30,7 @@
    private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
    public int getNextPort(MediaServerItem mediaServer) {
    public synchronized int getNextPort(MediaServerItem mediaServer) {
        if (mediaServer == null) {
            logger.warn("[发送端口管理] 参数错误,mediaServer为NULL");
            return -1;
@@ -50,17 +50,15 @@
        String sendRtpPortRange = mediaServer.getSendRtpPortRange();
        int startPort;
        int endPort;
        if (sendRtpPortRange == null) {
            logger.warn("{}未设置发送端口默认值,自动使用40000-50000作为端口范围", mediaServer.getId());
        if (sendRtpPortRange != null) {
            String[] portArray = sendRtpPortRange.split(",");
            if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
                logger.warn("{}发送端口配置格式错误,自动使用40000-50000作为端口范围", mediaServer.getId());
                logger.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId());
                startPort = 50000;
                endPort = 60000;
            }else {
                if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
                    logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用40000-50000作为端口范围", mediaServer.getId());
                    logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId());
                    startPort = 50000;
                    endPort = 60000;
                }else {
@@ -69,6 +67,7 @@
                }
            }
        }else {
            logger.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId());
            startPort = 50000;
            endPort = 60000;
        }
@@ -76,10 +75,35 @@
            logger.warn("{}获取redis连接信息失败", mediaServer.getId());
            return -1;
        }
//        RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
//        return redisAtomicInteger.getAndUpdate((current)->{
//            return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
//        });
        return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
    }
    private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpItem> sendRtpItemMap){
        RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
        return redisAtomicInteger.getAndUpdate((current)->{
            return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
        });
        if (redisAtomicInteger.get() < startPort) {
            redisAtomicInteger.set(startPort);
            return startPort;
        }else {
            int port = redisAtomicInteger.getAndIncrement();
            if (port > endPort) {
                redisAtomicInteger.set(startPort);
                if (sendRtpItemMap.containsKey(startPort)) {
                    return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
                }else {
                    return startPort;
                }
            }
            if (sendRtpItemMap.containsKey(port)) {
                return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
            }else {
                return port;
            }
        }
    }
    interface CheckPortCallback{
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
@@ -17,7 +17,6 @@
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -202,9 +201,9 @@
                        callId);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo == null) {
            sendInfo = new OtherRtpSendInfo();
            sendInfo = new OtherPsSendInfo();
        }
        sendInfo.setPushApp(app);
        sendInfo.setPushStream(stream);
@@ -223,7 +222,7 @@
        param.put("dst_port", dstPort);
        String is_Udp = isUdp ? "1" : "0";
        param.put("is_udp", is_Udp);
        param.put("src_port", sendInfo.getSendLocalPortForAudio());
        param.put("src_port", sendInfo.getSendLocalPort());
        param.put("use_ps", "0");
        param.put("only_audio", "1");
@@ -250,7 +249,7 @@
            }, 10000);
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            OtherRtpSendInfo finalSendInfo = sendInfo;
            OtherPsSendInfo finalSendInfo = sendInfo;
            hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
                    (mediaServerItemInUse, response)->{
@@ -282,7 +281,7 @@
    public void closeSendRTP(String callId) {
        logger.info("[第三方PS服务对接->关闭发送流] callId->{}", callId);
        String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_"  + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo == null){
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
        }
@@ -302,4 +301,24 @@
        redisTemplate.delete(key);
    }
    @GetMapping(value = "/getTestPort")
    @ResponseBody
    public int getTestPort() {
        MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer();
//        for (int i = 0; i <300; i++) {
//            new Thread(() -> {
//                int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer);
//                try {
//                    Thread.sleep((int)Math.random()*10);
//                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
//                }
//                System.out.println(nextPort);
//            }).start();
//        }
        return sendRtpPortManager.getNextPort(defaultMediaServer);
    }
}