648540858
2023-12-26 33fba05a381db591a7f5874eb242f90065e3458d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.genersoft.iot.vmp.media.zlm;
 
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.support.atomic.RedisAtomicInteger;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
@Component
public class SendRtpPortManager {
 
    private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class);
 
    @Autowired
    private UserSetting userSetting;
 
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
 
    private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
 
    public synchronized int getNextPort(MediaServerItem mediaServer) {
        if (mediaServer == null) {
            logger.warn("[发送端口管理] 参数错误,mediaServer为NULL");
            return -1;
        }
        String sendIndexKey = KEY + userSetting.getServerId() + "_" +  mediaServer.getId();
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*";
        List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
        Map<Integer, SendRtpItem> sendRtpItemMap = new HashMap<>();
 
        for (Object o : queryResult) {
            SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(o);
            if (sendRtpItem != null) {
                sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem);
            }
        }
        String sendRtpPortRange = mediaServer.getSendRtpPortRange();
        int startPort;
        int endPort;
        if (sendRtpPortRange != null) {
            String[] portArray = sendRtpPortRange.split(",");
            if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
                logger.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId());
                startPort = 50000;
                endPort = 60000;
            }else {
                if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
                    logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId());
                    startPort = 50000;
                    endPort = 60000;
                }else {
                    startPort = Integer.parseInt(portArray[0]);
                    endPort = Integer.parseInt(portArray[1]);
                }
            }
        }else {
            logger.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId());
            startPort = 50000;
            endPort = 60000;
        }
        if (redisTemplate == null || redisTemplate.getConnectionFactory() == null) {
            logger.warn("{}获取redis连接信息失败", mediaServer.getId());
            return -1;
        }
//        RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
//        return redisAtomicInteger.getAndUpdate((current)->{
//            return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
//        });
        return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
    }
 
    private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpItem> sendRtpItemMap){
        RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
        if (redisAtomicInteger.get() < startPort) {
            redisAtomicInteger.set(startPort);
            return startPort;
        }else {
            int port = redisAtomicInteger.getAndIncrement();
            if (port > endPort) {
                redisAtomicInteger.set(startPort);
                if (sendRtpItemMap.containsKey(startPort)) {
                    return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
                }else {
                    return startPort;
                }
            }
            if (sendRtpItemMap.containsKey(port)) {
                return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
            }else {
                return port;
            }
        }
 
    }
 
    interface CheckPortCallback{
        boolean check(int port);
    }
 
    private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) {
        if (current <= 0) {
            if (start%2 == 0) {
                current = start;
            }else {
                current = start + 1;
            }
        }else {
            current += 2;
            if (current > end) {
                if (start%2 == 0) {
                    current = start;
                }else {
                    current = start + 1;
                }
            }
        }
        if (!checkPortCallback.check(current)) {
            return getPort(current + 2, start, end, checkPortCallback);
        }
        return current;
    }
}