From f33c3a36302749d8552b281de3dbe37f672bba86 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期四, 27 一月 2022 18:08:19 +0800 Subject: [PATCH] 添加重启后拉流代理自动恢复 --- src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java | 5 +- src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 6 +++ web_src/src/components/StreamProxyList.vue | 8 ++++ src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java | 1 src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java | 10 +++++ sql/mysql.sql | 1 src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java | 18 ++++++--- src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 12 ++++++ sql/update.sql | 1 web_src/src/components/dialog/StreamProxyEdit.vue | 6 +- src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java | 34 +++++++++++++++- src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 4 +- 12 files changed, 90 insertions(+), 16 deletions(-) diff --git a/sql/mysql.sql b/sql/mysql.sql index 6569375..26ea595 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -444,6 +444,7 @@ `enable_hls` bit(1) DEFAULT NULL, `enable_mp4` bit(1) DEFAULT NULL, `enable` bit(1) NOT NULL, + `status` bit(1) NOT NULL, `enable_remove_none_reader` bit(1) NOT NULL, `createTime` varchar(50) NOT NULL, PRIMARY KEY (`app`,`stream`) diff --git a/sql/update.sql b/sql/update.sql new file mode 100644 index 0000000..d6386ea --- /dev/null +++ b/sql/update.sql @@ -0,0 +1 @@ +ALTER TABLE stream_proxy ADD status bit(1) not null; \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5919619..6e6b7b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -332,6 +332,11 @@ }else { mediaServerService.removeCount(mediaServerId); } + if (item.getOriginType() == OriginType.PULL.ordinal() + || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { + // 璁剧疆鎷夋祦浠g悊涓婄嚎/绂荤嚎 + streamProxyService.updateStatus(regist, app, streamId); + } if ("rtp".equals(app) && !regist ) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo!=null){ @@ -355,6 +360,7 @@ || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { streamPushItem = zlmMediaListManager.addPush(item); } + List<GbStream> gbStreams = new ArrayList<>(); if (streamPushItem == null || streamPushItem.getGbId() == null) { GbStream gbStream = storager.getGbStream(app, streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index 38e44a9..39685b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -151,4 +151,5 @@ public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { this.enable_remove_none_reader = enable_remove_none_reader; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 40b2c9a..ac10000 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -56,6 +56,16 @@ boolean start(String app, String stream); /** + * 鏇存柊鐘舵�� + * @param status 鐘舵�� + * @param app + * @param stream + */ + int updateStatus(boolean status, String app, String stream); + + + + /** * 鍋滅敤鐢ㄨ棰戜唬鐞� * @param app * @param stream diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index cd5f8ab..ccb2520 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -14,8 +14,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.utils.redis.JedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; @@ -69,6 +72,12 @@ @Autowired private RedisUtil redisUtil; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IStreamProxyService streamProxyService; @Autowired private EventPublisher publisher; @@ -231,6 +240,7 @@ public List<MediaServerItem> getAllOnline() { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(); Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1); + List<MediaServerItem> result = new ArrayList<>(); if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) { for (String mediaServerId : mediaServerIdSet) { @@ -238,6 +248,7 @@ result.add((MediaServerItem) redisUtil.get(serverKey)); } } + Collections.reverse(result); return result; } @@ -374,6 +385,7 @@ resetOnlineServerItem(serverItem); updateMediaServerKeepalive(serverItem.getId(), null); setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); + publisher.zlmOnlineEventPublish(serverItem.getId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 13277c2..afac6eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -59,6 +59,9 @@ private IRedisCatchStorage redisCatchStorage; @Autowired + private IVideoManagerStorager storager; + + @Autowired private UserSetup userSetup; @Autowired @@ -278,7 +281,27 @@ @Override public void zlmServerOnline(String mediaServerId) { - zlmServerOffline(mediaServerId); + // 绉婚櫎寮�鍚簡鏃犱汉瑙傜湅鑷姩绉婚櫎鐨勬祦 + List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId); + if (streamProxyItemList.size() > 0) { + gbStreamMapper.batchDel(streamProxyItemList); + } + streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); + + // 鎭㈠娴佷唬鐞�, 鍙煡鎵捐繖涓繖涓祦濯掍綋 + List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( + mediaServerId, true, false); + for (StreamProxyItem streamProxyDto : streamProxyListForEnable) { + logger.info("鎭㈠娴佷唬鐞嗭紝" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); + JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto); + if (jsonObject == null) { + // 璁剧疆涓虹绾� + logger.info("鎭㈠娴佷唬鐞嗗け璐�" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); + updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream()); + }else { + updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream()); + } + } } @Override @@ -289,8 +312,8 @@ gbStreamMapper.batchDel(streamProxyItemList); } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); - // 鍏朵粬鐨勬祦璁剧疆鏈惎鐢� - streamProxyMapper.updateStatus(false, mediaServerId); + // 鍏朵粬鐨勬祦璁剧疆绂荤嚎 + streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId); String type = "PULL"; // 鍙戦�乺edis娑堟伅 @@ -314,4 +337,9 @@ public void clean() { } + + @Override + public int updateStatus(boolean status, String app, String stream) { + return streamProxyMapper.updateStatus(status, app, stream); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 038fe2b..5e2745a 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -398,10 +398,11 @@ /** * 鏍规嵁濯掍綋ID鑾峰彇鍚敤/涓嶅惎鐢ㄧ殑浠g悊鍒楄〃 * @param id 濯掍綋ID - * @param b 鍚敤/涓嶅惎鐢� + * @param enable 鍚敤/涓嶅惎鐢� + * @param status 鐘舵�� * @return */ - List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean b); + List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status); /** * 鏍规嵁閫氶亾ID鑾峰彇鍏舵墍鍦ㄨ澶� diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index b6e1ba1..63cd425 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -11,9 +11,9 @@ public interface StreamProxyMapper { @Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " + - "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, enable_remove_none_reader, createTime) VALUES" + + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" + "('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " + - "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, " + + "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " + "${enable_remove_none_reader}, '${createTime}' )") int add(StreamProxyItem streamProxyDto); @@ -30,6 +30,7 @@ "rtp_type=#{rtp_type}, " + "enable_hls=#{enable_hls}, " + "enable=#{enable}, " + + "status=#{status}, " + "enable_remove_none_reader=#{enable_remove_none_reader}, " + "enable_mp4=#{enable_mp4} " + "WHERE app=#{app} AND stream=#{stream}") @@ -49,8 +50,8 @@ @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + - "WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc") - List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable); + "WHERE st.enable=${enable} and st.status=${status} and st.mediaServerId = '${id}' order by st.createTime desc") + List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable, boolean status); @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + @@ -58,9 +59,14 @@ List<StreamProxyItem> selectInMediaServer(String id); @Update("UPDATE stream_proxy " + - "SET enable=#{status} " + + "SET status=#{status} " + "WHERE mediaServerId=#{mediaServerId}") - void updateStatus(boolean status, String mediaServerId); + void updateStatusByMediaServerId(boolean status, String mediaServerId); + + @Update("UPDATE stream_proxy " + + "SET status=${status} " + + "WHERE app=#{app} AND stream=#{stream}") + int updateStatus(boolean status, String app, String stream); @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 1de1650..f43f92f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -860,8 +860,8 @@ } @Override - public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) { - return streamProxyMapper.selectForEnableInMediaServer(id, enable); + public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status) { + return streamProxyMapper.selectForEnableInMediaServer(id, enable, status); } diff --git a/web_src/src/components/StreamProxyList.vue b/web_src/src/components/StreamProxyList.vue index 64e994a..9ecb933 100644 --- a/web_src/src/components/StreamProxyList.vue +++ b/web_src/src/components/StreamProxyList.vue @@ -42,6 +42,14 @@ </el-table-column> <el-table-column prop="gbId" label="鍥芥爣缂栫爜" width="180" align="center" show-overflow-tooltip/> + <el-table-column label="鐘舵��" width="120" align="center"> + <template slot-scope="scope"> + <div slot="reference" class="name-wrapper"> + <el-tag size="medium" v-if="scope.row.status">鍦ㄧ嚎</el-tag> + <el-tag size="medium" type="info" v-if="!scope.row.status">绂荤嚎</el-tag> + </div> + </template> + </el-table-column> <el-table-column label="鍚敤" width="120" align="center"> <template slot-scope="scope"> <div slot="reference" class="name-wrapper"> diff --git a/web_src/src/components/dialog/StreamProxyEdit.vue b/web_src/src/components/dialog/StreamProxyEdit.vue index fa09cf8..8d3fe26 100644 --- a/web_src/src/components/dialog/StreamProxyEdit.vue +++ b/web_src/src/components/dialog/StreamProxyEdit.vue @@ -46,7 +46,6 @@ style="width: 100%" placeholder="璇烽�夋嫨鎷夋祦鑺傜偣" > - <el-option label="鑷姩閫夋嫨" value="auto"></el-option> <el-option v-for="item in mediaServerList" :key="item.id" @@ -172,7 +171,7 @@ enable_mp4: false, enable_remove_none_reader: false, platformGbId: null, - mediaServerId: "auto", + mediaServerId: null, }, mediaServerList:{}, ffmpegCmdList:{}, @@ -206,7 +205,8 @@ console.log(error); }); this.mediaServer.getOnlineMediaServerList((data)=>{ - this.mediaServerList = data; + this.mediaServerList = data.data; + this.proxyParam.mediaServerId = this.mediaServerList[0].id }) }, mediaServerIdChange:function (){ -- Gitblit v1.8.0