From 483d5e04a71b6632419699df1077ac5a5457ad38 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期一, 25 九月 2023 18:51:02 +0800
Subject: [PATCH] 优化拉流代理
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 21 ++++++++++++++++-----
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java | 3 +++
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 24 ++++++++++++++++--------
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 2 +-
4 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 89ecb18..122bc54 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -93,7 +93,7 @@
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) {
- if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
+ if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
continue;
}
DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
index e58f2ae..0403eec 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -32,13 +32,20 @@
}
private OkHttpClient getClient(){
+ return getClient(null);
+ }
+
+ private OkHttpClient getClient(Integer readTimeOut){
if (client == null) {
+ if (readTimeOut == null) {
+ readTimeOut = 10;
+ }
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
//todo 鏆傛椂鍐欐瓒呮椂鏃堕棿 鍧囦负5s
// 璁剧疆杩炴帴瓒呮椂鏃堕棿
- httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS);
+ httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS);
// 璁剧疆璇诲彇瓒呮椂鏃堕棿
- httpClientBuilder.readTimeout(10,TimeUnit.SECONDS);
+ httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
// 璁剧疆杩炴帴姹�
httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
if (logger.isDebugEnabled()) {
@@ -55,9 +62,13 @@
}
-
public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) {
- OkHttpClient client = getClient();
+ return sendPost(mediaServerItem, api, param, callback, null);
+ }
+
+
+ public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback, Integer readTimeOut) {
+ OkHttpClient client = getClient(readTimeOut);
if (mediaServerItem == null) {
return null;
@@ -310,7 +321,7 @@
param.put("enable_mp4", enable_mp4?1:0);
param.put("enable_audio", enable_audio?1:0);
param.put("rtp_type", rtp_type);
- return sendPost(mediaServerItem, "addStreamProxy",param, null);
+ return sendPost(mediaServerItem, "addStreamProxy",param, null, 20);
}
public JSONObject closeStreams(MediaServerItem mediaServerItem, String app, String stream) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index a4e00ad..f36cff9 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -11,6 +11,8 @@
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -160,15 +162,20 @@
callback.run(ErrorCode.ERROR100.getCode(), "淇濆瓨澶辫触", null);
return;
}
-
+ HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
+ hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null, null);
+ callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+ });
if (param.isEnable()) {
String talkKey = UUID.randomUUID().toString();
- dynamicTask.startCron(talkKey, ()->{
- StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
- if (streamInfo != null) {
- callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
- }
- }, 1000);
+// dynamicTask.startCron(talkKey, ()->{
+// StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
+// if (streamInfo != null) {
+// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+// }
+// }, 3000);
String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -178,9 +185,10 @@
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "瓒呮椂", null);
}
- }, 5000);
+ }, 7000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
index 0689f42..fe6df72 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -80,6 +80,9 @@
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
+ if (ObjectUtils.isEmpty(param.getRtpType())) {
+ param.setRtpType("1");
+ }
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
--
Gitblit v1.8.0