From ba8fffd907b5c3a7acfd3348c80e557b816cec98 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 03 七月 2023 16:53:03 +0800 Subject: [PATCH] 增加与第三方对接的接口 --- src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 154 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index c8c1625..5c74fcc 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -1,24 +1,41 @@ package com.genersoft.iot.vmp.vmanager.rtp; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; @SuppressWarnings("rawtypes") @Tag(name = "绗笁鏂规湇鍔″鎺�") @@ -27,8 +44,13 @@ @RequestMapping("/api/rtp") public class RtpController { + private final static Logger logger = LoggerFactory.getLogger(RtpController.class); + @Autowired - private ZlmHttpHookSubscribe zlmHttpHookSubscribe; + private ZLMRTPServerFactory zlmServerFactory; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -49,11 +71,11 @@ private IDeviceChannelService channelService; @Autowired - private IStreamPushService pushService; + private DynamicTask dynamicTask; @Autowired - private IStreamProxyService proxyService; + private RedisTemplate<Object, Object> redisTemplate; @Value("${server.port}") @@ -73,12 +95,76 @@ @Parameter(name = "stream", description = "褰㈡垚鐨勬祦鐨処D", required = true) @Parameter(name = "tcpMode", description = "鏀舵祦妯″紡锛� 0涓篣DP锛� 1涓篢CP琚姩", required = true) @Parameter(name = "callBack", description = "鍥炶皟鍦板潃锛屽鏋滄敹娴佽秴鏃朵細閫氶亾鍥炶皟閫氱煡锛屽洖璋冧负get璇锋眰锛屽弬鏁颁负callId", required = true) - public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { + + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", + isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack); + + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer"); } - return null; + if (stream == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream鍙傛暟涓嶅彲涓虹┖"); + } + if (isSend != null && isSend && callId == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖"); + } + int ssrcInt = 0; + if (ssrc != null) { + try { + ssrcInt = Integer.parseInt(ssrc); + }catch (NumberFormatException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒"); + } + + } + int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡 + if (callBack != null) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId()); + // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁� + hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, + (mediaServerItemInUse, response)->{ + if (stream.equals(response.getString("stream_id"))) { + logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId); + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + OkHttpClient client = httpClientBuilder.build(); + String url = callBack + "?callId=" + callId; + Request request = new Request.Builder().get().url(url).build(); + try { + client.newCall(request).execute(); + } catch (IOException e) { + logger.error("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e); + } + } + }); + } + OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); + otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setCallId(callId); + otherRtpSendInfo.setStream(stream); + if (isSend != null && isSend) { + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + // 棰勫垱寤哄彂娴佷俊鎭� + int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> { + return redisTemplate.opsForValue().get(key) != null; + }); + + // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤 + redisTemplate.opsForValue().set(key, otherRtpSendInfo); + // 璁剧疆瓒呮椂浠诲姟锛岃秴鏃舵湭浣跨敤锛屽垯鑷姩绉婚櫎锛屽苟鍏抽棴绔彛淇濇寔, 榛樿浜斿垎閽� + dynamicTask.startDelay(key, ()->{ + logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绔彛淇濇寔瓒呮椂 callId->{}", callId); + redisTemplate.delete(key); + zlmServerFactory.releasePort(mediaServerItem, callId); + }, 300000); + otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setPort(port); + logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo); + } + return otherRtpSendInfo; } @GetMapping(value = "/receive/close") @@ -86,7 +172,9 @@ @Operation(summary = "鍏抽棴鏀舵祦") @Parameter(name = "stream", description = "娴佺殑ID", required = true) public void closeRtpServer(String stream) { - + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + zlmServerFactory.closeRtpServer(mediaServerItem,stream); } @GetMapping(value = "/send/start") @@ -99,9 +187,46 @@ @Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true) @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) @Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true) + @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true) @Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) { + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}", + ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS"); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo != null) { + zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId()); + }else { + sendInfo = new OtherRtpSendInfo(); + } + sendInfo.setPushApp(app); + sendInfo.setPushStream(stream); + sendInfo.setPushSSRC(ssrc); + Map<String, Object> param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",app); + param.put("stream",stream); + param.put("ssrc", ssrc); + + param.put("dst_url",ip); + param.put("dst_port", port); + String is_Udp = isUdp ? "1" : "0"; + param.put("is_udp", is_Udp); + param.put("src_port", sendInfo.getPort()); + param.put("use_ps", streamType==2 ? "1" : "0"); + param.put("only_audio", onlyAudio ? "1" : "0"); + + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + if (jsonObject.getInteger("code") == 0) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg")); + } } @@ -111,7 +236,25 @@ @Operation(summary = "鍏抽棴鍙戦�佹祦") @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true) public void closeSendRTP(String callId) { - + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] callId->{}", callId); + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (sendInfo == null){ + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�"); + } + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendInfo.getPushApp()); + param.put("stream",sendInfo.getPushStream()); + param.put("ssrc",sendInfo.getPushSSRC()); + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); + Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param); + if (!result) { + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 澶辫触 callId->{}", callId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍋滄鍙戞祦澶辫触"); + }else { + logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId); + } } } -- Gitblit v1.8.0