From b1b6fae22c5ab3013d73df1a43cd42f7fe0fa347 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期二, 09 四月 2024 10:35:11 +0800 Subject: [PATCH] Merge branch 'master' into dev/zlm --- src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java | 92 +++++++++++++++++++++++++++------------------- 1 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java old mode 100644 new mode 100755 index 3ea265f..f7807b9 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -5,21 +5,20 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; -import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -49,7 +48,7 @@ private ZLMServerFactory zlmServerFactory; @Autowired - private ZlmHttpHookSubscribe hookSubscribe; + private HookSubscribe hookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -70,7 +69,7 @@ @GetMapping(value = "/receive/open") @ResponseBody - @Operation(summary = "寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅") + @Operation(summary = "寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "isSend", description = "鏄惁鍙戦�侊紝false鏃跺彧寮�鍚敹娴侊紝 true鍚屾椂杩斿洖鎺ㄦ祦淇℃伅", required = true) @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼负浜嗕笌鍚庣画鎺ュ彛鍏宠仈", required = true) @Parameter(name = "ssrc", description = "鏉ユ簮娴佺殑SSRC锛屼笉浼犲垯涓嶆牎楠屾潵婧恠src", required = false) @@ -82,7 +81,7 @@ logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); - MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer"); } @@ -92,27 +91,26 @@ if (isSend != null && isSend && callId == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖"); } - int ssrcInt = 0; + long ssrcInt = 0; if (ssrc != null) { try { - ssrcInt = Integer.parseInt(ssrc); + ssrcInt = Long.parseLong(ssrc); }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode); if (localPort == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); } // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId()); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (mediaServerItemInUse, hookParam)->{ - OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; - if (stream.equals(serverTimeoutHookParam.getStream_id())) { + hookSubscribe.addSubscribe(hook, + (hookData)->{ + if (stream.equals(hookData.getStream())) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 redisTemplate.delete(receiveKey); @@ -125,7 +123,7 @@ } catch (IOException e) { logger.error("[绗笁鏂筆S鏈嶅姟瀵规帴->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); } - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + hookSubscribe.removeSubscribe(hook); } }); } @@ -153,11 +151,11 @@ @GetMapping(value = "/receive/close") @ResponseBody - @Operation(summary = "鍏抽棴鏀舵祦") + @Operation(summary = "鍏抽棴鏀舵祦", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "stream", description = "娴佺殑ID", required = true) public void closeRtpServer(String stream) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鏀舵祦] stream->{}", stream); - MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream; List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); @@ -171,7 +169,7 @@ @GetMapping(value = "/send/start") @ResponseBody - @Operation(summary = "鍙戦�佹祦") + @Operation(summary = "鍙戦�佹祦", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true) @Parameter(name = "dstIp", description = "鐩爣鏀舵祦IP", required = true) @Parameter(name = "dstPort", description = "鐩爣鏀舵祦绔彛", required = true) @@ -200,11 +198,11 @@ app, stream, callId); - MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null) { - sendInfo = new OtherRtpSendInfo(); + sendInfo = new OtherPsSendInfo(); } sendInfo.setPushApp(app); sendInfo.setPushStream(stream); @@ -223,9 +221,7 @@ param.put("dst_port", dstPort); String is_Udp = isUdp ? "1" : "0"; param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getSendLocalPortForAudio()); - param.put("use_ps", "0"); - param.put("only_audio", "1"); + param.put("src_port", sendInfo.getSendLocalPort()); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); @@ -242,18 +238,18 @@ }else { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); String uuid = UUID.randomUUID().toString(); - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId()); dynamicTask.startDelay(uuid, ()->{ logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); redisTemplate.delete(key); - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }, 10000); // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� - OtherRtpSendInfo finalSendInfo = sendInfo; - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, - (mediaServerItemInUse, response)->{ + OtherPsSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hook); + hookSubscribe.addSubscribe(hook, + (hookData)->{ dynamicTask.stop(uuid); logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId); try { @@ -270,7 +266,7 @@ logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍙戦�佹祦] 瑙嗛娴佸彂娴佸け璐ワ紝callId->{}, {}", callId, jsonObject.getString("msg")); throw new ControllerException(ErrorCode.ERROR100.getCode(), "[瑙嗛娴佸彂娴佸け璐 " + jsonObject.getString("msg")); } - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.removeSubscribe(hook); }); } } @@ -282,7 +278,7 @@ public void closeSendRTP(String callId) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鍙戦�佹祦] callId->{}", callId); String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); } @@ -291,7 +287,7 @@ param.put("app",sendInfo.getPushApp()); param.put("stream",sendInfo.getPushStream()); param.put("ssrc",sendInfo.getPushSSRC()); - MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param); if (!result) { logger.info("[绗笁鏂筆S鏈嶅姟瀵规帴->鍏抽棴鍙戦�佹祦] 澶辫触 callId->{}", callId); @@ -302,4 +298,24 @@ redisTemplate.delete(key); } + + @GetMapping(value = "/getTestPort") + @ResponseBody + public int getTestPort() { + MediaServer defaultMediaServer = mediaServerService.getDefaultMediaServer(); + +// for (int i = 0; i <300; i++) { +// new Thread(() -> { +// int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer); +// try { +// Thread.sleep((int)Math.random()*10); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// System.out.println(nextPort); +// }).start(); +// } + + return sendRtpPortManager.getNextPort(defaultMediaServer); + } } -- Gitblit v1.8.0