From 885842249fb6b264b0abf78668872d04bdc179ce Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 07 七月 2023 18:17:24 +0800
Subject: [PATCH] 优化第三方对接接口
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 26 +++++++++++--
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java | 75 +++++++++++++++++++++++++++++--------
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java | 1
3 files changed, 81 insertions(+), 21 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index 6eec845..ec25ea2 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -154,6 +154,7 @@
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
+ public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_";
/**
* Redis Const
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 33c8713..5df3be4 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -22,14 +23,13 @@
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
-import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import com.genersoft.iot.vmp.vmanager.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
@@ -105,6 +105,9 @@
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
+
+ @Autowired
+ private RedisTemplate<Object, Object> redisTemplate;
@Qualifier("taskExecutor")
@Autowired
@@ -255,6 +258,21 @@
result.setEnable_mp4(true);
}
}
+
+ String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*";
+ // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤
+ List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
+ if (scan.size()>0) {
+ for (Object o : scan) {
+ String key = (String) o;
+ OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
+ if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) {
+ result.setEnable_audio(true);
+ result.setEnable_mp4(true);
+ }
+ }
+ }
+
if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) {
logger.info("鎺ㄦ祦鏃跺彂鐜板皻鏈缃綍鍍忚矾寰勶紝浠巃ssist鏈嶅姟涓鍙�");
JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null);
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
index 311a007..c06c4af 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
@@ -34,6 +35,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
@SuppressWarnings("rawtypes")
@Tag(name = "绗笁鏂规湇鍔″鎺�")
@@ -120,12 +122,12 @@
int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
// 娉ㄥ唽鍥炶皟濡傛灉rtp鏀舵祦瓒呮椂鍒欓�氳繃鍥炶皟鍙戦�侀�氱煡
if (callBack != null) {
- HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
+ HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
// 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁�
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(mediaServerItemInUse, response)->{
if (stream.equals(response.getString("stream_id"))) {
- logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId);
+ logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋�", callId);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
@@ -133,7 +135,7 @@
try {
client.newCall(request).execute();
} catch (IOException e) {
- logger.error("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e);
+ logger.error("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 绛夊緟鏀舵祦瓒呮椂 callId->{}, 鍙戦�佸洖璋冨け璐�", callId, e);
}
}
});
@@ -143,6 +145,9 @@
otherRtpSendInfo.setReceivePort(localPort);
otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream);
+ String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
+ // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤
+ redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
if (isSend != null && isSend) {
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
// 棰勫垱寤哄彂娴佷俊鎭�
@@ -160,7 +165,7 @@
}, 15000);
otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setPort(port);
- logger.info("[寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo);
+ logger.info("[绗笁鏂规湇鍔″鎺�->寮�鍚敹娴佸拰鑾峰彇鍙戞祦淇℃伅] 缁撴灉锛宑allId->{}锛� {}", callId, otherRtpSendInfo);
}
return otherRtpSendInfo;
}
@@ -173,6 +178,9 @@
logger.info("[绗笁鏂规湇鍔″鎺�->鍏抽棴鏀舵祦] stream->{}", stream);
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream);
+ String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
+ // 灏嗕俊鎭啓鍏edis涓紝浠ュ鍚庣敤
+ redisTemplate.delete(receiveKey);
}
@GetMapping(value = "/send/start")
@@ -187,9 +195,10 @@
@Parameter(name = "onlyAudio", description = "鏄惁鍙湁闊抽", required = true)
@Parameter(name = "isUdp", description = "鏄惁涓篣DP", required = true)
@Parameter(name = "streamType", description = "娴佺被鍨嬶紝1涓篹s娴侊紝2涓簆s娴侊紝 榛樿es娴�", required = false)
- public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) {
- logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
- ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
+ @Parameter(name = "pt", description = "rtp鐨刾t", required = true)
+ public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}",
+ ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt);
if (ObjectUtils.isEmpty(streamType)) {
streamType = 1;
}
@@ -197,7 +206,7 @@
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo != null) {
- zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
+ zlmServerFactory.releasePort(mediaServerItem, callId);
}else {
sendInfo = new OtherRtpSendInfo();
}
@@ -218,19 +227,51 @@
param.put("src_port", sendInfo.getPort());
param.put("use_ps", streamType==2 ? "1" : "0");
param.put("only_audio", onlyAudio ? "1" : "0");
+ param.put("pt", pt);
- JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
- if (jsonObject.getInteger("code") == 0) {
- logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId);
- redisTemplate.opsForValue().set(key, sendInfo);
+ dynamicTask.stop(key);
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
+ if (streamReady) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佸瓨鍦紝寮�濮嬪彂娴侊紝callId->{}", callId);
+ JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
+ if (jsonObject.getInteger("code") == 0) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId);
+ redisTemplate.opsForValue().set(key, sendInfo);
+ }else {
+ redisTemplate.delete(key);
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg"));
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg"));
+ }
}else {
- redisTemplate.delete(key);
- logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg"));
- throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg"));
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笉瀛樺湪锛岀瓑寰呮祦涓婄嚎锛宑allId->{}", callId);
+ String uuid = UUID.randomUUID().toString();
+ HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
+ dynamicTask.startDelay(uuid, ()->{
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 绛夊緟娴佷笂绾胯秴鏃� callId->{}", callId);
+ redisTemplate.delete(key);
+ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
+ }, 10000);
+
+ // 璁㈤槄 zlm鍚姩浜嬩欢, 鏂扮殑zlm涔熶細浠庤繖閲岃繘鍏ョ郴缁�
+ OtherRtpSendInfo finalSendInfo = sendInfo;
+ hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
+ (mediaServerItemInUse, response)->{
+ dynamicTask.stop(uuid);
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 娴佷笂绾匡紝寮�濮嬪彂娴� callId->{}", callId);
+ JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
+ System.out.println("========鍙戞祦缁撴灉==========");
+ System.out.println(jsonObject);
+ if (jsonObject.getInteger("code") == 0) {
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦鎴愬姛锛宑allId->{}", callId);
+ redisTemplate.opsForValue().set(key, finalSendInfo);
+ }else {
+ redisTemplate.delete(key);
+ logger.info("[绗笁鏂规湇鍔″鎺�->鍙戦�佹祦] 鍙戞祦澶辫触锛宑allId->{}, {}", callId, jsonObject.getString("msg"));
+ throw new ControllerException(ErrorCode.ERROR100.getCode(), "[鍙戞祦澶辫触] " + jsonObject.getString("msg"));
+ }
+ });
}
}
-
-
@GetMapping(value = "/send/stop")
@ResponseBody
--
Gitblit v1.8.0