From f525b5572988326c4b73da9f68e7ee7e292a2e46 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 12 七月 2023 15:13:53 +0800 Subject: [PATCH] 修复发流复盖的为问题 --- src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 145 +++++++++++++++++++++++++++--------------------- 1 files changed, 82 insertions(+), 63 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index 5c74fcc..b7a7e15 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -3,19 +3,17 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; @@ -26,16 +24,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.util.ObjectUtils; +import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") @Tag(name = "绗笁鏂规湇鍔″鎺�") @@ -56,19 +54,10 @@ private IMediaServerService mediaServerService; @Autowired - private VersionInfo versionInfo; - - @Autowired - private SipConfig sipConfig; + private SendRtpPortManager sendRtpPortManager; @Autowired private UserSetting userSetting; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private IDeviceChannelService channelService; @Autowired private DynamicTask dynamicTask; @@ -76,14 +65,6 @@ @Autowired private RedisTemplate<Object, Object> redisTemplate; - - - @Value("${server.port}") - private int serverPort; - - - @Autowired - private IRedisCatchStorage redisCatchStorage; @GetMapping(value = "/receive/open") @@ -95,7 +76,7 @@ @Parameter(name = "stream", description = "褰㈡垚鐨勬祦鐨処D", required = true) @Parameter(name = "tcpMode", description = "鏀舵祦妯″紡锛� 0涓篣DP锛� 1涓篢CP琚姩", required = true) @Parameter(name = "callBack", description = "鍥炶皟鍦板潃锛屽鏋滄敹娴佽秴鏃朵細閫氶亾鍥炶皟閫氱煡锛屽洖璋冧负get璇锋眰锛屽弬鏁颁负callId", required = true) - public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { + public OtherRtpSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) { logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); @@ -119,15 +100,18 @@ } } + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId()); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (mediaServerItemInUse, response)->{ if (stream.equals(response.getString("stream_id"))) { - logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(receiveKey); OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); OkHttpClient client = httpClientBuilder.build(); String url = callBack + "?callId=" + callId; @@ -135,7 +119,7 @@ try { client.newCall(request).execute(); } catch (IOException e) { - logger.error("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); + logger.error("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); } } }); @@ -145,24 +129,18 @@ otherRtpSendInfo.setReceivePort(localPort); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); - if (isSend != null && isSend) { - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; - // 棰勫垱寤哄彂娴佷俊鎭� - int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> { - return redisTemplate.opsForValue().get(key) != null; - }); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); + if (isSend != null && isSend) { + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; + // 棰勫垱寤哄彂娴佷俊鎭� + int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 - redisTemplate.opsForValue().set(key, otherRtpSendInfo); - // 璁剧疆瓒呮椂浠诲姟锛岃秴鏃舵湭浣跨敤锛屽垯鑷姩绉婚櫎锛屽苟鍏抽棴绔彛淇濇寔, 榛樿浜斿垎閽� - dynamicTask.startDelay(key, ()->{ - logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绔彛淇濇寔瓒呮椂 callId->{}", callId); - redisTemplate.delete(key); - zlmServerFactory.releasePort(mediaServerItem, callId); - }, 300000); + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); otherRtpSendInfo.setPort(port); - logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); } return otherRtpSendInfo; } @@ -175,6 +153,14 @@ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(key); + } + } } @GetMapping(value = "/send/start") @@ -189,15 +175,17 @@ @Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true) @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) @Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS"); + @Parameter(name = "pt", description = "rtp鐨刾t", required = true) + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}", + ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt); + if (ObjectUtils.isEmpty(streamType)) { + streamType = 1; + } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); - if (sendInfo != null) { - zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId()); - }else { + if (sendInfo == null) { sendInfo = new OtherRtpSendInfo(); } sendInfo.setPushApp(app); @@ -217,19 +205,50 @@ param.put("src_port", sendInfo.getPort()); param.put("use_ps", streamType==2 ? "1" : "0"); param.put("only_audio", onlyAudio ? "1" : "0"); + param.put("pt", pt); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佸瓨鍦紝寮�濮嬪彂娴侊紝callId->{}", callId); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + } }else { - redisTemplate.delete(key); - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); + String uuid = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }, 10000); + + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, + (mediaServerItemInUse, response)->{ + dynamicTask.stop(uuid); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}锛宲aram->{}", callId, JSONObject.toJSONString(param)); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + System.out.println("========鍙戞祦缁撴灉=========="); + System.out.println(jsonObject); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + } + }); } } - - @GetMapping(value = "/send/stop") @ResponseBody @@ -237,7 +256,7 @@ @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) public void closeSendRTP(String callId) { logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] callId->{}", callId); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); -- Gitblit v1.8.0