From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 310 +++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 272 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java old mode 100644 new mode 100755 index 304ccf9..7832e37 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -1,21 +1,38 @@ package com.genersoft.iot.vmp.vmanager.rtp; -import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import java.io.IOException; import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") @Tag(name = "绗笁鏂规湇鍔″鎺�") @@ -25,52 +42,269 @@ public class RtpController { @Autowired - private ZlmHttpHookSubscribe zlmHttpHookSubscribe; + private SendRtpPortManager sendRtpPortManager; + + private final static Logger logger = LoggerFactory.getLogger(RtpController.class); + + @Autowired + private HookSubscribe hookSubscribe; @Autowired private IMediaServerService mediaServerService; @Autowired - private VersionInfo versionInfo; - - @Autowired - private SipConfig sipConfig; - - @Autowired private UserSetting userSetting; @Autowired - private IDeviceService deviceService; + private DynamicTask dynamicTask; @Autowired - private IDeviceChannelService channelService; - - @Autowired - private IStreamPushService pushService; + private RedisTemplate<Object, Object> redisTemplate; - @Autowired - private IStreamProxyService proxyService; - - - @Value("${server.port}") - private int serverPort; - - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - - @GetMapping(value = "/send/ready") + @GetMapping(value = "/receive/open") @ResponseBody - @Operation(summary = "涓哄彂閫佽棰戞祦鑾峰彇淇℃伅") - public List<MediaServerItem> getMediaServerList(Boolean onlySender, ) { - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); - if (mediaServerItem == null) { + @Operation(summary = "寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "isSend", description = "鏄惁鍙戦�侊紝false鏃跺彧寮�鍚敹娴侊紝 true鍚屾椂杩斿洖鎺ㄦ祦淇℃伅", required = true) + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼负浜嗕笌鍚庣画鎺ュ彛鍏宠仈", required = true) + @Parameter(name = "ssrc", description = "鏉ユ簮娴佺殑SSRC锛屼笉浼犲垯涓嶆牎楠屾潵婧恠src", required = false) + @Parameter(name = "stream", description = "褰㈡垚鐨勬祦鐨処D", required = true) + @Parameter(name = "tcpMode", description = "鏀舵祦妯″紡锛� 0涓篣DP锛� 1涓篢CP琚姩", required = true) + @Parameter(name = "callBack", description = "鍥炶皟鍦板潃锛屽鏋滄敹娴佽秴鏃朵細閫氶亾鍥炶皟閫氱煡锛屽洖璋冧负get璇锋眰锛屽弬鏁颁负callId", required = true) + public OtherRtpSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) { + + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", + isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); + + MediaServer mediaServer = mediaServerService.getDefaultMediaServer(); + if (mediaServer == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer"); } - mediaServerService.openRTPServer() - return mediaServerService.getAll(); + if (stream == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream鍙傛暟涓嶅彲涓虹┖"); + } + if (isSend != null && isSend && callId == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖"); + } + long ssrcInt = 0; + if (ssrc != null) { + try { + ssrcInt = Long.parseLong(ssrc); + }catch (NumberFormatException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); + } + } + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; + SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode); + SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode); + if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鑾峰彇绔彛澶辫触"); + } + // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 + if (callBack != null) { + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId()); + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + hookSubscribe.addSubscribe(hook, + (hookData)->{ + if (stream.equals(hookData.getStream())) { + logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + OkHttpClient client = httpClientBuilder.build(); + String url = callBack + "?callId=" + callId; + Request request = new Request.Builder().get().url(url).build(); + try { + client.newCall(request).execute(); + } catch (IOException e) { + logger.error("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); + } + hookSubscribe.removeSubscribe(hook); + } + }); + } + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; + OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); + otherRtpSendInfo.setReceiveIp(mediaServer.getSdpIp()); + otherRtpSendInfo.setReceivePortForVideo(ssrcInfoForVideo.getPort()); + otherRtpSendInfo.setReceivePortForAudio(ssrcInfoForAudio.getPort()); + otherRtpSendInfo.setCallId(callId); + otherRtpSendInfo.setStream(stream); + + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); + if (isSend != null && isSend) { + // 棰勫垱寤哄彂娴佷俊鎭� + int portForVideo = sendRtpPortManager.getNextPort(mediaServer); + int portForAudio = sendRtpPortManager.getNextPort(mediaServer); + + otherRtpSendInfo.setSendLocalIp(mediaServer.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); + } + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); + return otherRtpSendInfo; + } + + @GetMapping(value = "/receive/close") + @ResponseBody + @Operation(summary = "鍏抽棴鏀舵祦", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "stream", description = "娴佺殑ID", required = true) + public void closeRtpServer(String stream) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream); + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); + mediaServerService.closeRTPServer(mediaServerItem, stream); + mediaServerService.closeRTPServer(mediaServerItem, stream+ "_a"); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.delete(key); + } + } + } + + @GetMapping(value = "/send/start") + @ResponseBody + @Operation(summary = "鍙戦�佹祦", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true) + @Parameter(name = "dstIpForAudio", description = "鐩爣闊抽鏀舵祦IP", required = false) + @Parameter(name = "dstIpForVideo", description = "鐩爣瑙嗛鏀舵祦IP", required = false) + @Parameter(name = "dstPortForAudio", description = "鐩爣闊抽鏀舵祦绔彛", required = false) + @Parameter(name = "dstPortForVideo", description = "鐩爣瑙嗛鏀舵祦绔彛", required = false) + @Parameter(name = "app", description = "寰呭彂閫佸簲鐢ㄥ悕", required = true) + @Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true) + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) + @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) + @Parameter(name = "ptForAudio", description = "rtp鐨勯煶棰憄t", required = false) + @Parameter(name = "ptForVideo", description = "rtp鐨勮棰憄t", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "鑷冲皯搴旇瀛樺湪涓�缁勯煶棰戞垨瑙嗛鍙戦�佸弬鏁�"); + } + MediaServer mediaServer = mediaServerService.getDefaultMediaServer(); + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo == null) { + sendInfo = new OtherRtpSendInfo(); + } + sendInfo.setPushApp(app); + sendInfo.setPushStream(stream); + sendInfo.setPushSSRC(ssrc); + + + SendRtpItem sendRtpItemForVideo; + SendRtpItem sendRtpItemForAudio; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + sendRtpItemForAudio = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForAudio(), ptForAudio); + } else { + sendRtpItemForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + sendRtpItemForVideo = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForVideo(), ptForVideo); + } else { + sendRtpItemForVideo = null; + } + + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream); + if (streamReady) { + if (sendRtpItemForVideo != null) { + mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, sendRtpItemForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + } + if(sendRtpItemForAudio != null) { + mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, sendRtpItemForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + } + }else { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId); + String uuid = UUID.randomUUID().toString(); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hook); + }, 10000); + + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + hookSubscribe.removeSubscribe(hook); + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.addSubscribe(hook, + (hookData)->{ + dynamicTask.stop(uuid); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (sendRtpItemForVideo != null) { + mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 瑙嗛娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, sendRtpItemForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + } + if(sendRtpItemForAudio != null) { + mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 闊抽娴佸彂娴佹垚鍔燂紝callId->{}锛宲aram->{}", callId, sendRtpItemForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + } + hookSubscribe.removeSubscribe(hook); + }); + } + } + + @GetMapping(value = "/send/stop") + @ResponseBody + @Operation(summary = "鍏抽棴鍙戦�佹祦", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) + public void closeSendRTP(String callId) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] callId->{}", callId); + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo == null){ + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); + } + MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); + mediaServerService.stopSendRtp(mediaServerItem, sendInfo.getPushApp(), sendInfo.getPushStream(), sendInfo.getPushSSRC()); + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId); + redisTemplate.delete(key); } } -- Gitblit v1.8.0