From 483d5e04a71b6632419699df1077ac5a5457ad38 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期一, 25 九月 2023 18:51:02 +0800 Subject: [PATCH] 优化拉流代理 --- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java | 21 ++++++++++++++++----- src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java | 3 +++ src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 24 ++++++++++++++++-------- src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java | 2 +- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 89ecb18..122bc54 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -93,7 +93,7 @@ } if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ for (GbStream gbStream : event.getGbStreams()) { - if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { + if (gbStream != null && gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) { continue; } DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index e58f2ae..0403eec 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -32,13 +32,20 @@ } private OkHttpClient getClient(){ + return getClient(null); + } + + private OkHttpClient getClient(Integer readTimeOut){ if (client == null) { + if (readTimeOut == null) { + readTimeOut = 10; + } OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); //todo 鏆傛椂鍐欐瓒呮椂鏃堕棿 鍧囦负5s // 璁剧疆杩炴帴瓒呮椂鏃堕棿 - httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); + httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS); // 璁剧疆璇诲彇瓒呮椂鏃堕棿 - httpClientBuilder.readTimeout(10,TimeUnit.SECONDS); + httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS); // 璁剧疆杩炴帴姹� httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); if (logger.isDebugEnabled()) { @@ -55,9 +62,13 @@ } - public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) { - OkHttpClient client = getClient(); + return sendPost(mediaServerItem, api, param, callback, null); + } + + + public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback, Integer readTimeOut) { + OkHttpClient client = getClient(readTimeOut); if (mediaServerItem == null) { return null; @@ -310,7 +321,7 @@ param.put("enable_mp4", enable_mp4?1:0); param.put("enable_audio", enable_audio?1:0); param.put("rtp_type", rtp_type); - return sendPost(mediaServerItem, "addStreamProxy",param, null); + return sendPost(mediaServerItem, "addStreamProxy",param, null, 20); } public JSONObject closeStreams(MediaServerItem mediaServerItem, String app, String stream) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index a4e00ad..f36cff9 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -11,6 +11,8 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -160,15 +162,20 @@ callback.run(ErrorCode.ERROR100.getCode(), "淇濆瓨澶辫触", null); return; } - + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); if (param.isEnable()) { String talkKey = UUID.randomUUID().toString(); - dynamicTask.startCron(talkKey, ()->{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); - if (streamInfo != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - } - }, 1000); +// dynamicTask.startCron(talkKey, ()->{ +// StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); +// if (streamInfo != null) { +// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); +// } +// }, 3000); String delayTalkKey = UUID.randomUUID().toString(); dynamicTask.startDelay(delayTalkKey, ()->{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); @@ -178,9 +185,10 @@ dynamicTask.stop(talkKey); callback.run(ErrorCode.ERROR100.getCode(), "瓒呮椂", null); } - }, 5000); + }, 7000); JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(talkKey); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 0689f42..fe6df72 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -80,6 +80,9 @@ if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); } + if (ObjectUtils.isEmpty(param.getRtpType())) { + param.setRtpType("1"); + } if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } -- Gitblit v1.8.0