From 61e5226122e7816dbaa18baec907d0e5934ee170 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期四, 06 七月 2023 15:07:43 +0800
Subject: [PATCH] Merge branch '2.6.8' into wvp-28181-2.0
---
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 197 ++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 178 insertions(+), 19 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
index 36ef8ac..2b061b4 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -1,23 +1,42 @@
package com.genersoft.iot.vmp.vmanager.rtp;
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.*;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
+import com.genersoft.iot.vmp.service.IDeviceChannelService;
+import com.genersoft.iot.vmp.service.IDeviceService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes")
@Tag(name = "绗笁鏂规湇鍔″鎺�")
@@ -27,7 +46,15 @@
public class RtpController {
@Autowired
- private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
+ private ZLMServerFactory zlmServerFactory;
+
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
+
+ private final static Logger logger = LoggerFactory.getLogger(RtpController.class);
+
+ @Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private IMediaServerService mediaServerService;
@@ -48,11 +75,11 @@
private IDeviceChannelService channelService;
@Autowired
- private IStreamPushService pushService;
+ private DynamicTask dynamicTask;
@Autowired
- private IStreamProxyService proxyService;
+ private RedisTemplate<Object, Object> redisTemplate;
@Value("${server.port}")
@@ -63,35 +90,167 @@
private IRedisCatchStorage redisCatchStorage;
- @GetMapping(value = "/openRtpServer")
+ @GetMapping(value = "/receive/open")
@ResponseBody
@Operation(summary = "寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅")
- @Parameter(name = "isSend", description = "鏄惁鍙戦�侊紝false鏃跺悓鏃跺彧寮�鍚敹娴�", required = true)
- @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑", required = true)
- @Parameter(name = "ssrc", description = "鏉ユ簮娴佺殑SSRC", required = false)
- @Parameter(name = "hasAudio", description = "鏄惁", required = false)
+ @Parameter(name = "isSend", description = "鏄惁鍙戦�侊紝false鏃跺彧寮�鍚敹娴侊紝 true鍚屾椂杩斿洖鎺ㄦ祦淇℃伅", required = true)
+ @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼负浜嗕笌鍚庣画鎺ュ彛鍏宠仈", required = true)
+ @Parameter(name = "ssrc", description = "鏉ユ簮娴佺殑SSRC锛屼笉浼犲垯涓嶆牎楠屾潵婧恠src", required = false)
@Parameter(name = "stream", description = "褰㈡垚鐨勬祦鐨処D", required = true)
@Parameter(name = "tcpMode", description = "鏀舵祦妯″紡锛� 0涓篣DP锛� 1涓篢CP琚姩", required = true)
- public void openRtpServer(Boolean isSend, String ssrc, String callId, Boolean hasAudio, String stream, Integer tcpMode) {
- MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
+ @Parameter(name = "callBack", description = "鍥炶皟鍦板潃锛屽鏋滄敹娴佽秴鏃朵細閫氶亾鍥炶皟閫氱煡锛屽洖璋冧负get璇锋眰锛屽弬鏁颁负callId", required = true)
+ public OtherRtpSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
+
+ logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
+ isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP琚姩", callBack);
+
+ MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"娌℃湁鍙敤鐨凪ediaServer");
}
+ if (stream == null) {
+ throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream鍙傛暟涓嶅彲涓虹┖");
+ }
+ if (isSend != null && isSend && callId == null) {
+ throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend涓簍rue鏃讹紝CallID涓嶈兘涓虹┖");
+ }
+ int ssrcInt = 0;
+ if (ssrc != null) {
+ try {
+ ssrcInt = Integer.parseInt(ssrc);
+ }catch (NumberFormatException e) {
+ throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc鏍煎紡閿欒");
+ }
+
+ }
+ int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
+ // 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡
+ if (callBack != null) {
+ HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
+ // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁�
+ hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
+ (mediaServerItemInUse, hookParam)->{
+ OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
+ if (stream.equals(serverTimeoutHookParam.getStream_id())) {
+ logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId);
+ OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
+ OkHttpClient client = httpClientBuilder.build();
+ String url = callBack + "?callId=" + callId;
+ Request request = new Request.Builder().get().url(url).build();
+ try {
+ client.newCall(request).execute();
+ } catch (IOException e) {
+ logger.error("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e);
+ }
+ }
+ });
+ }
+ String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
+ OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
+ otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
+ otherRtpSendInfo.setReceivePort(localPort);
+ otherRtpSendInfo.setCallId(callId);
+ otherRtpSendInfo.setStream(stream);
+ if (isSend != null && isSend) {
+ int port = sendRtpPortManager.getNextPort(mediaServerItem.getId());
+ otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
+ otherRtpSendInfo.setPort(port);
+ logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo);
+ }
+ // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤
+ redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS);
+ return otherRtpSendInfo;
}
- @GetMapping(value = "/sendRTP")
+ @GetMapping(value = "/receive/close")
+ @ResponseBody
+ @Operation(summary = "鍏抽棴鏀舵祦")
+ @Parameter(name = "stream", description = "娴佺殑ID", required = true)
+ public void closeRtpServer(String stream) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream);
+ MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
+ zlmServerFactory.closeRtpServer(mediaServerItem,stream);
+ }
+
+ @GetMapping(value = "/send/start")
@ResponseBody
@Operation(summary = "鍙戦�佹祦")
-
@Parameter(name = "ssrc", description = "鍙戦�佹祦鐨凷SRC", required = true)
@Parameter(name = "ip", description = "鐩爣IP", required = true)
@Parameter(name = "port", description = "鐩爣绔彛", required = true)
@Parameter(name = "app", description = "寰呭彂閫佸簲鐢ㄥ悕", required = true)
@Parameter(name = "stream", description = "寰呭彂閫佹祦Id", required = true)
- @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑", required = true)
+ @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true)
@Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true)
- public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio) {
+ @Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true)
+ @Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false)
+ public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
+ ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
+ if (ObjectUtils.isEmpty(streamType)) {
+ streamType = 1;
+ }
+ MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
+ String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
+ OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
+ if (sendInfo == null) {
+ sendInfo = new OtherRtpSendInfo();
+ }
+ sendInfo.setPushApp(app);
+ sendInfo.setPushStream(stream);
+ sendInfo.setPushSSRC(ssrc);
+ Map<String, Object> param = new HashMap<>(12);
+ param.put("vhost","__defaultVhost__");
+ param.put("app",app);
+ param.put("stream",stream);
+ param.put("ssrc", ssrc);
+
+ param.put("dst_url",ip);
+ param.put("dst_port", port);
+ String is_Udp = isUdp ? "1" : "0";
+ param.put("is_udp", is_Udp);
+ param.put("src_port", sendInfo.getPort());
+ param.put("use_ps", streamType==2 ? "1" : "0");
+ param.put("only_audio", onlyAudio ? "1" : "0");
+
+ JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
+ if (jsonObject.getInteger("code") == 0) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId);
+ redisTemplate.opsForValue().set(key, sendInfo);
+ }else {
+ redisTemplate.delete(key);
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg"));
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg"));
+ }
+ }
+
+
+
+ @GetMapping(value = "/send/stop")
+ @ResponseBody
+ @Operation(summary = "鍏抽棴鍙戦�佹祦")
+ @Parameter(name = "callId", description = "鏁翠釜杩囩▼鐨勫敮涓�鏍囪瘑锛屼笉浼犲垯浣跨敤闅忔満绔彛鍙戞祦", required = true)
+ public void closeSendRTP(String callId) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] callId->{}", callId);
+ String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
+ OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
+ if (sendInfo == null){
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鏈紑鍚彂娴�");
+ }
+ Map<String, Object> param = new HashMap<>();
+ param.put("vhost","__defaultVhost__");
+ param.put("app",sendInfo.getPushApp());
+ param.put("stream",sendInfo.getPushStream());
+ param.put("ssrc",sendInfo.getPushSSRC());
+ MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
+ Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
+ if (!result) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 澶辫触 callId->{}", callId);
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "鍋滄鍙戞祦澶辫触");
+ }else {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鍙戦�佹祦] 鎴愬姛 callId->{}", callId);
+ }
}
}
--
Gitblit v1.8.0