From 130dc5d82da0e89241eefd4ea91ba7d861de866d Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: 星期六, 30 九月 2023 06:09:16 +0800
Subject: [PATCH] 优化拉流代理逻辑,修复ffmpeg拉流代理鉴权
---
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 7 +++
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java | 9 ++++
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 6 +++
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java | 7 ++-
sql/初始化.sql | 1
sql/2.6.9更新.sql | 3 +
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java | 14 +++++++
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 65 +++++++++++++++++++++-----------
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 5 ++
9 files changed, 91 insertions(+), 26 deletions(-)
diff --git "a/sql/2.6.9\346\233\264\346\226\260.sql" "b/sql/2.6.9\346\233\264\346\226\260.sql"
index 514b39e..769004d 100644
--- "a/sql/2.6.9\346\233\264\346\226\260.sql"
+++ "b/sql/2.6.9\346\233\264\346\226\260.sql"
@@ -3,3 +3,6 @@
alter table wvp_platform
add auto_push_channel bool default false
+
+alter table wvp_stream_proxy
+ add stream_key varying(255)
diff --git "a/sql/\345\210\235\345\247\213\345\214\226.sql" "b/sql/\345\210\235\345\247\213\345\214\226.sql"
index 64c404f..b95dd87 100644
--- "a/sql/\345\210\235\345\247\213\345\214\226.sql"
+++ "b/sql/\345\210\235\345\247\213\345\214\226.sql"
@@ -244,6 +244,7 @@
create_time character varying(50),
name character varying(255),
update_time character varying(50),
+ stream_key character varying(255),
enable_disable_none_reader bool default false,
constraint uk_stream_proxy_app_stream unique (app, stream)
);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 122bc54..0448cd2 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -93,7 +93,10 @@
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) {
- if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
+ if (gbStream != null
+ && gbStream.getStreamType() != null
+ && gbStream.getStreamType().equals("push")
+ && !userSetting.isUsePushingAsStatus()) {
continue;
}
DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 217d00a..3c76883 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -199,6 +199,13 @@
}
// 鎺ㄦ祦閴存潈鐨勫鐞�
if (!"rtp".equals(param.getApp())) {
+ StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+ if (stream != null) {
+ HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
+ result.setEnable_audio(stream.isEnableAudio());
+ result.setEnable_mp4(stream.isEnableMp4());
+ return result;
+ }
if (userSetting.getPushAuthority()) {
// 鎺ㄦ祦閴存潈
if (param.getParams() == null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
index 0403eec..84e9e7e 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -272,6 +272,12 @@
return sendPost(mediaServerItem, "delFFmpegSource",param, null);
}
+ public JSONObject delStreamProxy(MediaServerItem mediaServerItem, String key){
+ Map<String, Object> param = new HashMap<>();
+ param.put("key", key);
+ return sendPost(mediaServerItem, "delStreamProxy",param, null);
+ }
+
public JSONObject getMediaServerConfig(MediaServerItem mediaServerItem){
return sendPost(mediaServerItem, "getServerConfig",null, null);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
index dd517e3..0486d00 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
@@ -41,6 +41,9 @@
@Schema(description = "鏄惁 鏃犱汉瑙傜湅鏃惰嚜鍔ㄥ仠鐢�")
private boolean enableDisableNoneReader;
+ @Schema(description = "鎷夋祦浠g悊鏃秡lm杩斿洖鐨刱ey锛岀敤浜庡仠姝㈡媺娴佷唬鐞�")
+ private String streamKey;
+
public String getType() {
return type;
}
@@ -167,5 +170,11 @@
this.enableAudio = enable_audio;
}
+ public String getStreamKey() {
+ return streamKey;
+ }
+ public void setStreamKey(String streamKey) {
+ this.streamKey = streamKey;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index f36cff9..7fbe769 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -10,6 +10,7 @@
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@@ -60,6 +61,9 @@
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
+
+ @Autowired
+ private ZLMServerFactory zlmServerFactory;
@Autowired
private StreamProxyMapper streamProxyMapper;
@@ -145,7 +149,7 @@
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
param.getStream());
}else {
- dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
+ dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
param.getStream());
}
param.setDstUrl(dstUrl);
@@ -170,12 +174,6 @@
});
if (param.isEnable()) {
String talkKey = UUID.randomUUID().toString();
-// dynamicTask.startCron(talkKey, ()->{
-// StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
-// if (streamInfo != null) {
-// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
-// }
-// }, 3000);
String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@@ -318,13 +316,32 @@
if (mediaServerItem == null) {
return null;
}
- if ("default".equals(param.getType())){
- result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
- param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
- }else if ("ffmpeg".equals(param.getType())) {
+ if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
+ zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+ }
+ if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
+ }else {
+ result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
+ param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
+ }
+ System.out.println("addStreamProxyToZlm====");
+ System.out.println(result);
+ if (result != null && result.getInteger("code") == 0) {
+ JSONObject data = result.getJSONObject("data");
+ if (data == null) {
+ logger.warn("[鑾峰彇鎷夋祦浠g悊鐨勭粨鏋滄暟鎹瓺ata] 澶辫触锛� {}", result );
+ return result;
+ }
+ String key = data.getString("key");
+ if (key == null) {
+ logger.warn("[鑾峰彇鎷夋祦浠g悊鐨勭粨鏋滄暟鎹瓺ata涓殑KEY] 澶辫触锛� {}", result );
+ return result;
+ }
+ param.setStreamKey(key);
+ streamProxyMapper.update(param);
}
return result;
}
@@ -335,7 +352,12 @@
return null;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
- JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
+ JSONObject result = null;
+ if ("ffmpeg".equalsIgnoreCase(param.getType())){
+ result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
+ }else {
+ result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
+ }
return result;
}
@@ -350,19 +372,18 @@
if (streamProxyItem != null) {
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
- JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
- if (jsonObject != null && jsonObject.getInteger("code") == 0) {
- // 濡傛灉鍏宠仈浜嗗浗鏍囬偅涔堢Щ闄ゅ叧鑱�
- int i = platformGbStreamMapper.delByAppAndStream(app, stream);
- gbStreamMapper.del(app, stream);
- System.out.println();
- // TODO 濡傛灉鍏宠仈鐨勬帹娴侊紝 閭d箞鐘舵�佽缃负绂荤嚎
- }
+ // 濡傛灉鍏宠仈浜嗗浗鏍囬偅涔堢Щ闄ゅ叧鑱�
+ platformGbStreamMapper.delByAppAndStream(app, stream);
+ gbStreamMapper.del(app, stream);
videoManagerStorager.deleteStreamProxy(app, stream);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
+ JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
+ if (jsonObject != null && jsonObject.getInteger("code") == 0) {
+ logger.info("[绉婚櫎浠g悊]锛� 浠g悊锛� {}/{}, 浠巣lm绉婚櫎鎴愬姛", app, stream);
+ }else {
+ logger.info("[绉婚櫎浠g悊]锛� 浠g悊锛� {}/{}, 浠巣lm绉婚櫎澶辫触", app, stream);
+ }
}
-
-
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
index 34a0673..6ad36ce 100755
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -12,9 +12,9 @@
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
- "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
+ "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
- "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " +
+ "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
int add(StreamProxyItem streamProxyDto);
@@ -33,6 +33,7 @@
"enable_audio=#{enableAudio}, " +
"enable=#{enable}, " +
"status=#{status}, " +
+ "stream_key=#{streamKey}, " +
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
"enable_mp4=#{enableMp4} " +
@@ -45,7 +46,7 @@
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
List<StreamProxyItem> selectAll();
- @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
+ @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxyItem> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
index fe6df72..e28ca11 100755
--- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
+++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -67,6 +67,16 @@
return streamProxyService.getAll(page, count);
}
+ @Operation(summary = "鏌ヨ娴佷唬鐞�")
+ @Parameter(name = "app", description = "搴旂敤鍚�")
+ @Parameter(name = "stream", description = "娴両d")
+ @GetMapping(value = "/one")
+ @ResponseBody
+ public StreamProxyItem one(String app, String stream){
+
+ return streamProxyService.getStreamProxyByAppAndStream(app, stream);
+ }
+
@Operation(summary = "淇濆瓨浠g悊", parameters = {
@Parameter(name = "param", description = "浠g悊鍙傛暟", required = true),
})
@@ -86,6 +96,10 @@
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
+ StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
+ if (streamProxyItem != null) {
+ streamProxyService.del(param.getApp(), param.getStream());
+ }
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
--
Gitblit v1.8.0