From d67f0a1ea403fbebfff70e7eaf4fe4870be2a1b8 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 08 八月 2023 16:06:02 +0800 Subject: [PATCH] 解决使用redis集群时获取发送端口失败的问题 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 190 ++++++++++++++++------------------------------- 1 files changed, 65 insertions(+), 125 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 9bf1a3a..0f0c139 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -1,17 +1,18 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.HashMap; +import java.util.Map; @Component public class ZLMRTPServerFactory { @@ -27,70 +28,20 @@ @Autowired private ZlmHttpHookSubscribe hookSubscribe; - private int[] portRangeArray = new int[2]; + @Autowired + private SendRtpPortManager sendRtpPortManager; - public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) { - if (endPort <= startPort) { - return -1; - } - if (usedFreelist == null) { - usedFreelist = new ArrayList<>(); - } - JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem); - if (listRtpServerJsonResult != null) { - JSONArray data = listRtpServerJsonResult.getJSONArray("data"); - if (data != null) { - for (int i = 0; i < data.size(); i++) { - JSONObject dataItem = data.getJSONObject(i); - usedFreelist.add(dataItem.getInteger("port")); - } - } - } - - Map<String, Object> param = new HashMap<>(); - int result = -1; - // 璁剧疆鎺ㄦ祦绔彛 - if (startPort%2 == 1) { - startPort ++; - } - boolean checkPort = false; - for (int i = startPort; i < endPort + 1; i+=2) { - if (!usedFreelist.contains(i)){ - checkPort = true; - startPort = i; - break; - } - } - if (!checkPort) { - logger.warn("鏈壘鍒拌妭鐐箋}涓婅寖鍥碵{}-{}]鐨勭┖闂茬鍙�", mediaServerItem.getId(), startPort, endPort); - return -1; - } - param.put("port", startPort); - String stream = UUID.randomUUID().toString(); - param.put("enable_tcp", 1); - param.put("stream_id", stream); -// param.put("port", 0); - JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); - - if (openRtpServerResultJson != null) { - if (openRtpServerResultJson.getInteger("code") == 0) { - result= openRtpServerResultJson.getInteger("port"); - Map<String, Object> closeRtpServerParam = new HashMap<>(); - closeRtpServerParam.put("stream_id", stream); - zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); - }else { - usedFreelist.add(startPort); - startPort +=2; - result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist); - } - }else { - // 妫�鏌LM鐘舵�� - logger.error("鍒涘缓RTP Server 澶辫触 {}: 璇锋鏌LM鏈嶅姟", param.get("port")); - } - return result; - } - - public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port) { + /** + * 寮�鍚痳tpServer + * @param mediaServerItem zlm鏈嶅姟瀹炰緥 + * @param streamId 娴両d + * @param ssrc ssrc + * @param port 绔彛锛� 0/null涓轰娇鐢ㄩ殢鏈� + * @param reUsePort 鏄惁閲嶇敤绔彛 + * @param tcpMode 0/null udp 妯″紡锛�1 tcp 琚姩妯″紡, 2 tcp 涓诲姩妯″紡銆� + * @return + */ + public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean reUsePort, Integer tcpMode) { int result = -1; // 鏌ヨ姝tp server 鏄惁宸茬粡瀛樺湪 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId); @@ -106,7 +57,7 @@ JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param); if (jsonObject != null ) { if (jsonObject.getInteger("code") == 0) { - return createRTPServer(mediaServerItem, streamId, ssrc, port); + return createRTPServer(mediaServerItem, streamId, ssrc, port, reUsePort, tcpMode); }else { logger.warn("[寮�鍚痳tpServer], 閲嶅惎RtpServer閿欒"); } @@ -120,15 +71,24 @@ Map<String, Object> param = new HashMap<>(); - param.put("enable_tcp", 1); + if (tcpMode == null) { + tcpMode = 0; + } + param.put("tcp_mode", tcpMode); param.put("stream_id", streamId); + if (reUsePort != null) { + param.put("re_use_port", reUsePort?"1":"0"); + } // 鎺ㄦ祦绔彛璁剧疆0鍒欎娇鐢ㄩ殢鏈虹鍙� if (port == null) { param.put("port", 0); }else { param.put("port", port); } - param.put("ssrc", ssrc); + if (ssrc != 0) { + param.put("ssrc", ssrc); + } + JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); logger.info(JSONObject.toJSONString(openRtpServerResultJson)); if (openRtpServerResultJson != null) { @@ -164,6 +124,31 @@ return result; } + public void closeRtpServer(MediaServerItem serverItem, String streamId, CommonCallback<Boolean> callback) { + if (serverItem == null) { + callback.run(false); + return; + } + Map<String, Object> param = new HashMap<>(); + param.put("stream_id", streamId); + zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> { + if (jsonObject != null ) { + if (jsonObject.getInteger("code") == 0) { + callback.run(jsonObject.getInteger("hit") == 1); + return; + }else { + logger.error("鍏抽棴RTP Server 澶辫触: " + jsonObject.getString("msg")); + } + }else { + // 妫�鏌LM鐘舵�� + logger.error("鍏抽棴RTP Server 澶辫触: 璇锋鏌LM鏈嶅姟"); + } + callback.run(false); + }); + + + } + /** * 鍒涘缓涓�涓浗鏍囨帹娴� @@ -175,17 +160,12 @@ * @param tcp 鏄惁涓簍cp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){ + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String deviceId, String channelId, boolean tcp, boolean rtcp){ - // 榛樿涓洪殢鏈虹鍙� - int localPort = 0; - if (userSetting.getGbSendStreamStrict()) { - if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); - if (localPort == 0) { - return null; - } - } + int localPort = sendRtpPortManager.getNextPort(serverItem); + if (localPort == 0) { + return null; } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setIp(ip); @@ -213,14 +193,12 @@ * @param tcp 鏄惁涓簍cp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){ - // 榛樿涓洪殢鏈虹鍙� - int localPort = 0; - if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); - if (localPort == 0) { - return null; - } + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp){ + + int localPort = sendRtpPortManager.getNextPort(serverItem); + if (localPort == 0) { + return null; } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setIp(ip); @@ -236,44 +214,6 @@ sendRtpItem.setMediaServerId(serverItem.getId()); sendRtpItem.setRtcp(rtcp); return sendRtpItem; - } - - /** - * 淇濇寔绔彛锛岀洿鍒伴渶瑕侀渶瑕佸彂娴佹椂鍐嶉噴鏀� - */ - public int keepPort(MediaServerItem serverItem, String ssrc) { - int localPort = 0; - Map<String, Object> param = new HashMap<>(3); - param.put("port", 0); - param.put("enable_tcp", 1); - param.put("stream_id", ssrc); - JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); - if (jsonObject.getInteger("code") == 0) { - localPort = jsonObject.getInteger("port"); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (MediaServerItem mediaServerItem, JSONObject response)->{ - logger.info("[涓婄骇鐐规挱] {}->鐩戝惉绔彛鍒版湡缁х画淇濇寔鐩戝惉", ssrc); - keepPort(serverItem, ssrc); - }); - logger.info("[涓婄骇鐐规挱] {}->鐩戝惉绔彛: {}", ssrc, localPort); - }else { - logger.info("[涓婄骇鐐规挱] 鐩戝惉绔彛澶辫触: {}", ssrc); - } - return localPort; - } - - /** - * 閲婃斁淇濇寔鐨勭鍙� - */ - public boolean releasePort(MediaServerItem serverItem, String ssrc) { - logger.info("[涓婄骇鐐规挱] {}->閲婃斁鐩戝惉绔彛", ssrc); - boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - return closeRTPServerResult; } /** -- Gitblit v1.8.0