From b7f2a6b25ba8bab01d7e10d5556702be8af37b80 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期三, 07 六月 2023 20:21:10 +0800
Subject: [PATCH] Merge branch '2.6.8' into wvp-28181-2.0
---
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java | 7 +
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 4 +
src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java | 5 +
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java | 3
src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java | 12 ----
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java | 44 ++++++++++++++
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java | 2
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 44 ++++++++++----
src/main/resources/application.yml | 2
9 files changed, 91 insertions(+), 32 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java
new file mode 100644
index 0000000..df07fac
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java
@@ -0,0 +1,5 @@
+package com.genersoft.iot.vmp.common;
+
+public interface GeneralCallback<T>{
+ void run(int code, String msg, T data);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java
index 689f843..657bb2f 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java
@@ -121,16 +121,4 @@
return redisTemplate.opsForSet().members(redisKey) != null;
}
- /**
- * 鏌ヨssrc鏄惁鍙敤
- *
- * @param mediaServerId
- * @param ssrc
- * @return
- */
- public boolean checkSsrc(String mediaServerId, String ssrc) {
- String sn = ssrc.substring(1);
- String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
- return redisTemplate.opsForSet().isMember(redisKey, sn) != null;
- }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
index 83722a0..f2291f2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
@@ -39,6 +39,8 @@
public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD";
+ public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY";
+
public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 8960e2c..a8b9b9a 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -452,7 +452,7 @@
}
subscribe.removeSubscribe(hookSubscribe);
});
- Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
+ Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
@@ -568,7 +568,10 @@
});
});
- Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
+ Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
+ if (inviteStreamCallback != null) {
+ inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream()));
+ }
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index 9a642d8..081d919 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -6,7 +6,9 @@
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
index 0e1c97b..c4968a7 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
@@ -13,7 +14,7 @@
* 淇濆瓨瑙嗛浠g悊
* @param param
*/
- StreamInfo save(StreamProxyItem param);
+ void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback);
/**
* 娣诲姞瑙嗛浠g悊鍒皕lm
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index f73392f..195f035 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -2,12 +2,16 @@
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -86,6 +90,9 @@
private IMediaServerService mediaServerService;
@Autowired
+ private ZlmHttpHookSubscribe hookSubscribe;
+
+ @Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
@@ -93,7 +100,7 @@
@Override
- public StreamInfo save(StreamProxyItem param) {
+ public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo;
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
@@ -107,7 +114,6 @@
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
param.getStream() );
param.setDstUrl(dstUrl);
- StringBuffer resultMsg = new StringBuffer();
param.setMediaServerId(mediaInfo.getId());
boolean saveResult;
// 鏇存柊
@@ -117,14 +123,25 @@
saveResult = addStreamProxy(param);
}
if (!saveResult) {
- throw new ControllerException(ErrorCode.ERROR100.getCode(),"淇濆瓨澶辫触");
+ callback.run(ErrorCode.ERROR100.getCode(), "淇濆瓨澶辫触", null);
+ return;
}
- StreamInfo resultForStreamInfo = null;
- resultMsg.append("淇濆瓨鎴愬姛");
+
+ HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
+ hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null, null);
+ callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+ });
+
if (param.isEnable()) {
JSONObject jsonObject = addStreamProxyToZlm(param);
- if (jsonObject == null || jsonObject.getInteger("code") != 0) {
- resultMsg.append(", 浣嗘槸鍚敤澶辫触锛岃妫�鏌ユ祦鍦板潃鏄惁鍙敤");
+ if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null, null);
+ callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+ }else {
param.setEnable(false);
// 鐩存帴绉婚櫎
if (param.isEnableRemoveNoneReader()) {
@@ -132,14 +149,15 @@
}else {
updateStreamProxy(param);
}
-
- }else {
- resultForStreamInfo = mediaService.getStreamInfoByAppAndStream(
- mediaInfo, param.getApp(), param.getStream(), null, null);
-
+ if (jsonObject == null){
+ callback.run(ErrorCode.ERROR100.getCode(), "璁板綍宸蹭繚瀛橈紝鍚敤澶辫触", null);
+ return;
+ }else {
+ callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
+ return;
+ }
}
}
- return resultForStreamInfo;
}
/**
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
index 4a8522b..0689f42 100644
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -1,13 +1,18 @@
package com.genersoft.iot.vmp.vmanager.streamProxy;
import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@@ -18,6 +23,9 @@
import org.springframework.stereotype.Controller;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.UUID;
@SuppressWarnings("rawtypes")
/**
@@ -36,6 +44,12 @@
@Autowired
private IStreamProxyService streamProxyService;
+
+ @Autowired
+ private DeferredResultHolder resultHolder;
+
+ @Autowired
+ private UserSetting userSetting;
@Operation(summary = "鍒嗛〉鏌ヨ娴佷唬鐞�")
@@ -58,7 +72,7 @@
})
@PostMapping(value = "/save")
@ResponseBody
- public StreamContent save(@RequestBody StreamProxyItem param){
+ public DeferredResult<Object> save(@RequestBody StreamProxyItem param){
logger.info("娣诲姞浠g悊锛� " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@@ -69,7 +83,33 @@
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
- return new StreamContent(streamProxyService.save(param));
+
+ RequestMessage requestMessage = new RequestMessage();
+ String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
+ requestMessage.setKey(key);
+ String uuid = UUID.randomUUID().toString();
+ requestMessage.setId(uuid);
+ DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
+ // 褰曞儚鏌ヨ浠hannelId浣滀负deviceId鏌ヨ
+ resultHolder.put(key, uuid, result);
+ result.onTimeout(()->{
+ WVPResult<StreamInfo> wvpResult = new WVPResult<>();
+ wvpResult.setCode(ErrorCode.ERROR100.getCode());
+ wvpResult.setMsg("瓒呮椂");
+ requestMessage.setData(wvpResult);
+ resultHolder.invokeAllResult(requestMessage);
+ });
+
+ streamProxyService.save(param, (code, msg, streamInfo) -> {
+ logger.info("[鎷夋祦浠g悊] {}", code == ErrorCode.SUCCESS.getCode()? "鎴愬姛":"澶辫触锛� " + msg);
+ if (code == ErrorCode.SUCCESS.getCode()) {
+ requestMessage.setData(new StreamContent(streamInfo));
+ }else {
+ requestMessage.setData(WVPResult.fail(code, msg));
+ }
+ resultHolder.invokeAllResult(requestMessage);
+ });
+ return result;
}
@GetMapping(value = "/ffmpeg_cmd/list")
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 3f47844..c160a2f 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -2,4 +2,4 @@
application:
name: wvp
profiles:
- active: local
\ No newline at end of file
+ active: local268
\ No newline at end of file
--
Gitblit v1.8.0