From 9fc3db1f5e381378fd54818e0ba017df6be96fa9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 13 七月 2023 14:30:41 +0800 Subject: [PATCH] 增加发流音视频分开发送 --- src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 198 +++++++++++++++++++++++++++++++++++++------------ 1 files changed, 149 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index b7a7e15..a9a66f5 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -98,10 +98,13 @@ }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); } - } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, tcpMode); + if (localPortForVideo == 0 || localPortForAudio == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); + } // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 if (callBack != null) { HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); @@ -121,12 +124,14 @@ } catch (IOException e) { logger.error("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); } OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setReceivePortForVideo(localPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(localPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); @@ -135,11 +140,13 @@ if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; // 棰勫垱寤哄彂娴佷俊鎭� - int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem.getId()); // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); - otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setPort(port); + otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); } return otherRtpSendInfo; @@ -153,6 +160,7 @@ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a"); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); if (scan.size() > 0) { @@ -167,20 +175,51 @@ @ResponseBody @Operation(summary = "鍙戦�佹祦") @Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true) - @Parameter(name = "ip", description = "鐩爣IP", required = true) - @Parameter(name = "port", description = "鐩爣绔彛", required = true) + @Parameter(name = "dstIpForAudio", description = "鐩爣闊抽鏀舵祦IP", required = false) + @Parameter(name = "dstIpForVideo", description = "鐩爣瑙嗛鏀舵祦IP", required = false) + @Parameter(name = "dstPortForAudio", description = "鐩爣闊抽鏀舵祦绔彛", required = false) + @Parameter(name = "dstPortForVideo", description = "鐩爣瑙嗛鏀舵祦绔彛", required = false) @Parameter(name = "app", description = "寰呭彂閫佸簲鐢ㄥ悕", required = true) @Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true) @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) - @Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true) @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) - @Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false) - @Parameter(name = "pt", description = "rtp鐨刾t", required = true) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt); - if (ObjectUtils.isEmpty(streamType)) { - streamType = 1; + @Parameter(name = "ptForAudio", description = "rtp鐨勯煶棰憄t", required = false) + @Parameter(name = "ptForVideo", description = "rtp鐨勮棰憄t", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "鑷冲皯搴旇瀛樺湪涓�缁勯煶棰戞垨瑙嗛鍙戦�佸弬鏁�"); } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; @@ -192,32 +231,74 @@ sendInfo.setPushStream(stream); sendInfo.setPushSSRC(ssrc); - Map<String, Object> param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",app); - param.put("stream",stream); - param.put("ssrc", ssrc); + Map<String, Object> paramForAudio; + Map<String, Object> paramForVideo; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + paramForAudio = new HashMap<>(); + paramForAudio.put("vhost","__defaultVhost__"); + paramForAudio.put("app",app); + paramForAudio.put("stream",stream); + paramForAudio.put("ssrc", ssrc); - param.put("dst_url",ip); - param.put("dst_port", port); - String is_Udp = isUdp ? "1" : "0"; - param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getPort()); - param.put("use_ps", streamType==2 ? "1" : "0"); - param.put("only_audio", onlyAudio ? "1" : "0"); - param.put("pt", pt); + paramForAudio.put("dst_url", dstIpForAudio); + paramForAudio.put("dst_port", dstPortForAudio); + String is_Udp = isUdp ? "1" : "0"; + paramForAudio.put("is_udp", is_Udp); + paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio()); + paramForAudio.put("use_ps", "0"); + paramForAudio.put("only_audio", "1"); + if (ptForAudio != null) { + paramForAudio.put("pt", ptForAudio); + } + + } else { + paramForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + paramForVideo = new HashMap<>(); + paramForVideo.put("vhost","__defaultVhost__"); + paramForVideo.put("app",app); + paramForVideo.put("stream",stream); + paramForVideo.put("ssrc", ssrc); + + paramForVideo.put("dst_url", dstIpForVideo); + paramForVideo.put("dst_port", dstPortForVideo); + String is_Udp = isUdp ? "1" : "0"; + paramForVideo.put("is_udp", is_Udp); + paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo()); + paramForVideo.put("use_ps", "0"); + paramForVideo.put("only_audio", "0"); + if (ptForVideo != null) { + paramForVideo.put("pt", ptForVideo); + } + + } else { + paramForVideo = null; + } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佸瓨鍦紝寮�濮嬪彂娴侊紝callId->{}", callId); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); - }else { - redisTemplate.delete(key); - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[闊抽娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } } }else { logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); @@ -231,21 +312,39 @@ // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, response)->{ dynamicTask.stop(uuid); - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}锛宲aram->{}", callId, JSONObject.toJSONString(param)); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - System.out.println("========鍙戞祦缁撴灉=========="); - System.out.println(jsonObject); - if (jsonObject.getInteger("code") == 0) { - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); - redisTemplate.opsForValue().set(key, finalSendInfo); - }else { - redisTemplate.delete(key); - logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[闊抽娴佸彂娴佸け璐 " + jsonObject.getString("msg")); + } + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); }); } } @@ -274,6 +373,7 @@ }else { logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId); } + redisTemplate.delete(key); } } -- Gitblit v1.8.0