From 613399cc6d14cfe5b8a245d462629ecee5deb2db Mon Sep 17 00:00:00 2001 From: xiaoQQya <xiaoQQya@126.com> Date: 星期二, 17 十月 2023 17:49:31 +0800 Subject: [PATCH] fix(play): 修复单端口推流下级自定义 ssrc 时, 流注册后接口仍然超时的问题 --- src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 265 ++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 194 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java old mode 100644 new mode 100755 index 2b061b4..a579c48 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -3,21 +3,18 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; @@ -28,14 +25,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") @@ -60,34 +58,13 @@ private IMediaServerService mediaServerService; @Autowired - private VersionInfo versionInfo; - - @Autowired - private SipConfig sipConfig; - - @Autowired private UserSetting userSetting; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private IDeviceChannelService channelService; @Autowired private DynamicTask dynamicTask; - @Autowired private RedisTemplate<Object, Object> redisTemplate; - - - @Value("${server.port}") - private int serverPort; - - - @Autowired - private IRedisCatchStorage redisCatchStorage; @GetMapping(value = "/receive/open") @@ -114,19 +91,23 @@ if (isSend != null && isSend && callId == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖"); } - int ssrcInt = 0; + long ssrcInt = 0; if (ssrc != null) { try { - ssrcInt = Integer.parseInt(ssrc); + ssrcInt = Long.parseLong(ssrc); }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); } - } - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; + int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, tcpMode); + if (localPortForVideo == 0 || localPortForAudio == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); + } // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId()); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (mediaServerItemInUse, hookParam)->{ @@ -140,22 +121,33 @@ try { client.newCall(request).execute(); } catch (IOException e) { - logger.error("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); + logger.error("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); } - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setReceivePortForVideo(localPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(localPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); + + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { - int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); - otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setPort(port); - logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); + // 棰勫垱寤哄彂娴佷俊鎭� + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem); + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem); + + otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); } // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); @@ -170,28 +162,69 @@ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a"); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(key); + } + } } @GetMapping(value = "/send/start") @ResponseBody @Operation(summary = "鍙戦�佹祦") @Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true) - @Parameter(name = "ip", description = "鐩爣IP", required = true) - @Parameter(name = "port", description = "鐩爣绔彛", required = true) + @Parameter(name = "dstIpForAudio", description = "鐩爣闊抽鏀舵祦IP", required = false) + @Parameter(name = "dstIpForVideo", description = "鐩爣瑙嗛鏀舵祦IP", required = false) + @Parameter(name = "dstPortForAudio", description = "鐩爣闊抽鏀舵祦绔彛", required = false) + @Parameter(name = "dstPortForVideo", description = "鐩爣瑙嗛鏀舵祦绔彛", required = false) @Parameter(name = "app", description = "寰呭彂閫佸簲鐢ㄥ悕", required = true) @Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true) @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) - @Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true) @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) - @Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS"); - if (ObjectUtils.isEmpty(streamType)) { - streamType = 1; + @Parameter(name = "ptForAudio", description = "rtp鐨勯煶棰憄t", required = false) + @Parameter(name = "ptForVideo", description = "rtp鐨勮棰憄t", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "鑷冲皯搴旇瀛樺湪涓�缁勯煶棰戞垨瑙嗛鍙戦�佸弬鏁�"); } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null) { sendInfo = new OtherRtpSendInfo(); @@ -200,32 +233,121 @@ sendInfo.setPushStream(stream); sendInfo.setPushSSRC(ssrc); - Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",app); - param.put("stream",stream); - param.put("ssrc", ssrc); + Map<String, Object> paramForAudio; + Map<String, Object> paramForVideo; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + paramForAudio = new HashMap<>(); + paramForAudio.put("vhost","__defaultVhost__"); + paramForAudio.put("app",app); + paramForAudio.put("stream",stream); + paramForAudio.put("ssrc", ssrc); - param.put("dst_url",ip); - param.put("dst_port", port); - String is_Udp = isUdp ? "1" : "0"; - param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getPort()); - param.put("use_ps", streamType==2 ? "1" : "0"); - param.put("only_audio", onlyAudio ? "1" : "0"); + paramForAudio.put("dst_url", dstIpForAudio); + paramForAudio.put("dst_port", dstPortForAudio); + String is_Udp = isUdp ? "1" : "0"; + paramForAudio.put("is_udp", is_Udp); + paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio()); + paramForAudio.put("only_audio", "1"); + if (ptForAudio != null) { + paramForAudio.put("pt", ptForAudio); + } - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); + } else { + paramForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + paramForVideo = new HashMap<>(); + paramForVideo.put("vhost","__defaultVhost__"); + paramForVideo.put("app",app); + paramForVideo.put("stream",stream); + paramForVideo.put("ssrc", ssrc); + + paramForVideo.put("dst_url", dstIpForVideo); + paramForVideo.put("dst_port", dstPortForVideo); + String is_Udp = isUdp ? "1" : "0"; + paramForVideo.put("is_udp", is_Udp); + paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo()); + paramForVideo.put("only_audio", "0"); + if (ptForVideo != null) { + paramForVideo.put("pt", ptForVideo); + } + + } else { + paramForVideo = null; + } + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[闊抽娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } }else { - redisTemplate.delete(key); - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); + String uuid = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }, 10000); + + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, + (mediaServerItemInUse, response)->{ + dynamicTask.stop(uuid); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[闊抽娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }); } } - - @GetMapping(value = "/send/stop") @ResponseBody @@ -233,7 +355,7 @@ @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) public void closeSendRTP(String callId) { logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] callId->{}", callId); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); @@ -251,6 +373,7 @@ }else { logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId); } + redisTemplate.delete(key); } } -- Gitblit v1.8.0