From 14d554987685d7a8cfef0b95edca6c7d744c6a53 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期五, 13 十月 2023 15:47:22 +0800
Subject: [PATCH] Merge branch 'wvp-28181-2.0' into main-dev
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 79 ++++++++++++++++++++++++++-------------
1 files changed, 52 insertions(+), 27 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
old mode 100644
new mode 100755
index d699bbc..7dfcf98
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -10,7 +10,10 @@
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -60,6 +63,9 @@
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
+ private ZLMServerFactory zlmServerFactory;
+
+ @Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
@@ -76,12 +82,6 @@
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
-
- @Autowired
- private EventPublisher eventPublisher;
-
- @Autowired
- private ParentPlatformMapper parentPlatformMapper;
@Autowired
private IGbStreamService gbStreamService;
@@ -143,7 +143,7 @@
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
param.getStream());
}else {
- dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
+ dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
param.getStream());
}
param.setDstUrl(dstUrl);
@@ -160,15 +160,14 @@
callback.run(ErrorCode.ERROR100.getCode(), "淇濆瓨澶辫触", null);
return;
}
-
+ HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
+ hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
+ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
+ mediaInfo, param.getApp(), param.getStream(), null, null);
+ callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
+ });
if (param.isEnable()) {
String talkKey = UUID.randomUUID().toString();
- dynamicTask.startCron(talkKey, ()->{
- StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
- if (streamInfo != null) {
- callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
- }
- }, 1000);
String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -178,9 +177,10 @@
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "瓒呮椂", null);
}
- }, 5000);
+ }, 7000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
@@ -310,13 +310,32 @@
if (mediaServerItem == null) {
return null;
}
- if ("default".equals(param.getType())){
- result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
- param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
- }else if ("ffmpeg".equals(param.getType())) {
+ if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
+ zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+ }
+ if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
+ }else {
+ result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
+ param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
+ }
+ System.out.println("addStreamProxyToZlm====");
+ System.out.println(result);
+ if (result != null && result.getInteger("code") == 0) {
+ JSONObject data = result.getJSONObject("data");
+ if (data == null) {
+ logger.warn("[鑾峰彇鎷夋祦浠g悊鐨勭粨鏋滄暟鎹瓺ata] 澶辫触锛� {}", result );
+ return result;
+ }
+ String key = data.getString("key");
+ if (key == null) {
+ logger.warn("[鑾峰彇鎷夋祦浠g悊鐨勭粨鏋滄暟鎹瓺ata涓殑KEY] 澶辫触锛� {}", result );
+ return result;
+ }
+ param.setStreamKey(key);
+ streamProxyMapper.update(param);
}
return result;
}
@@ -327,7 +346,12 @@
return null;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
- JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+ JSONObject result = null;
+ if ("ffmpeg".equalsIgnoreCase(param.getType())){
+ result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
+ }else {
+ result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
+ }
return result;
}
@@ -341,18 +365,19 @@
StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
if (streamProxyItem != null) {
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
+
+ // 濡傛灉鍏宠仈浜嗗浗鏍囬偅涔堢Щ闄ゅ叧鑱�
+ platformGbStreamMapper.delByAppAndStream(app, stream);
+ gbStreamMapper.del(app, stream);
videoManagerStorager.deleteStreamProxy(app, stream);
+ redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
- // 濡傛灉鍏宠仈浜嗗浗鏍囬偅涔堢Щ闄ゅ叧鑱�
- gbStreamMapper.del(app, stream);
- platformGbStreamMapper.delByAppAndStream(app, stream);
- // TODO 濡傛灉鍏宠仈鐨勬帹娴侊紝 閭d箞鐘舵�佽缃负绂荤嚎
+ logger.info("[绉婚櫎浠g悊]锛� 浠g悊锛� {}/{}, 浠巣lm绉婚櫎鎴愬姛", app, stream);
+ }else {
+ logger.info("[绉婚櫎浠g悊]锛� 浠g悊锛� {}/{}, 浠巣lm绉婚櫎澶辫触", app, stream);
}
- redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
}
-
-
}
@Override
--
Gitblit v1.8.0