From f61051c46361c4863faf73db81062de0889900d4 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 24 十一月 2021 11:19:52 +0800 Subject: [PATCH] 优化streamchannge hook以及对推流的识别 --- src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 49 ++++++----- src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java | 9 ++ src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java | 6 src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java | 5 src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 8 +- src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java | 4 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 58 ++++++++----- src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java | 4 src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java | 4 src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java | 13 ++ src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java | 2 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java | 26 ++++++ src/main/java/com/genersoft/iot/vmp/service/IMediaService.java | 4 src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java | 3 14 files changed, 129 insertions(+), 66 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index c5e2a24..e16c1ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -30,7 +30,7 @@ private String rtsps; private String rtc; private String mediaServerId; - private JSONArray tracks; + private Object tracks; public static class TransactionInfo{ public String callId; @@ -105,11 +105,11 @@ this.rtsp = rtsp; } - public JSONArray getTracks() { + public Object getTracks() { return tracks; } - public void setTracks(JSONArray tracks) { + public void setTracks(Object tracks) { this.tracks = tracks; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index e35f059..16802cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -3,11 +3,13 @@ import java.util.List; import java.util.UUID; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; @@ -258,12 +260,13 @@ */ @ResponseBody @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") - public ResponseEntity<String> onStreamChanged(@RequestBody JSONObject json){ + public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){ if (logger.isDebugEnabled()) { - logger.debug("ZLM HOOK on_stream_changed API璋冪敤锛屽弬鏁帮細" + json.toString()); + logger.debug("ZLM HOOK on_stream_changed API璋冪敤锛屽弬鏁帮細" + JSONObject.toJSONString(item)); } - String mediaServerId = json.getString("mediaServerId"); + String mediaServerId = item.getMediaServerId(); + JSONObject json = (JSONObject) JSON.toJSON(item); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -272,13 +275,12 @@ } } - // 娴佹秷澶辩Щ闄edis play - String app = json.getString("app"); - String streamId = json.getString("stream"); - String schema = json.getString("schema"); - JSONArray tracks = json.getJSONArray("tracks"); - boolean regist = json.getBoolean("regist"); + String app = item.getApp(); + String streamId = item.getStream(); + String schema = item.getSchema(); + List<MediaItem.MediaTrack> tracks = item.getTracks(); + boolean regist = item.isRegist(); if (tracks != null) { logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema); } @@ -298,24 +300,34 @@ redisCatchStorage.stopPlayback(streamInfo); } }else { - if (!"rtp".equals(app) ){ - // 鍙戦�佹祦鍙樺寲redis娑堟伅 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", streamId); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(jsonObject); + if (!"rtp".equals(app)){ + + boolean pushChange = false; MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (regist) { - zlmMediaListManager.addMedia(mediaServerItem, app, streamId); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); - redisCatchStorage.addStream(mediaServerItem, app, streamId, streamInfo); + if ((item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8)) { + pushChange = true; + zlmMediaListManager.addMedia(item); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); + redisCatchStorage.addPushStream(mediaServerItem, app, streamId, streamInfo); + } }else { - zlmMediaListManager.removeMedia( app, streamId); - redisCatchStorage.removeStream(mediaServerItem, app, streamId); + int result = zlmMediaListManager.removeMedia( app, streamId); + redisCatchStorage.removePushStream(mediaServerItem, app, streamId); + if (result > 0) { + pushChange = true; + } + } + if(pushChange) { + // 鍙戦�佹祦鍙樺寲redis娑堟伅 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", app); + jsonObject.put("stream", streamId); + jsonObject.put("register", regist); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(jsonObject); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index ea1123c..49fe098 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; @@ -87,6 +88,10 @@ updateMedia(mediaServerItem, app, streamId); } + public void addMedia(MediaItem mediaItem) { + storager.updateMedia(streamPushService.transform(mediaItem)); + } + public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) { //浣跨敤寮傛鏇存柊鎺ㄦ祦 @@ -113,14 +118,16 @@ } - public void removeMedia(String app, String streamId) { + public int removeMedia(String app, String streamId) { // 鏌ユ壘鏄惁鍏宠仈浜嗗浗鏍囷紝 鍏宠仈浜嗕笉鍒犻櫎锛� 缃负绂荤嚎 StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId); + int result = 0; if (streamProxyItem == null) { - storager.removeMedia(app, streamId); + result = storager.removeMedia(app, streamId); }else { - storager.mediaOutline(app, streamId); + result =storager.mediaOutline(app, streamId); } + return result; } // public void clearAllSessions() { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java index 6d9ceee..4685d1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java @@ -5,6 +5,11 @@ public class MediaItem { /** + * 娉ㄥ唽/娉ㄩ攢 + */ + private boolean regist; + + /** * 搴旂敤鍚� */ private String app; @@ -54,6 +59,11 @@ private String originUrl; /** + * 鏈嶅姟鍣╥d + */ + private String mediaServerId; + + /** * GMT unix绯荤粺鏃堕棿鎴筹紝鍗曚綅绉� */ private Long createStamp; @@ -77,6 +87,14 @@ * 闊宠棰戣建閬� */ private String vhost; + + public boolean isRegist() { + return regist; + } + + public void setRegist(boolean regist) { + this.regist = regist; + } /** * 鏄惁鏄痙ocker閮ㄧ讲锛� docker閮ㄧ讲涓嶄細鑷姩鏇存柊zlm浣跨敤鐨勭鍙o紝闇�瑕佽嚜宸辨墜鍔ㄤ慨鏀� @@ -376,4 +394,12 @@ public void setDocker(boolean docker) { this.docker = docker; } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index 40ba215..38e44a9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -17,6 +17,7 @@ private boolean enable; private boolean enable_hls; private boolean enable_mp4; + private boolean enable_remove_none_reader; // 鏃犱汉瑙傜湅鏃跺垹闄� private String platformGbId; private String createTime; @@ -142,4 +143,12 @@ public void setCreateTime(String createTime) { this.createTime = createTime; } + + public boolean isEnable_remove_none_reader() { + return enable_remove_none_reader; + } + + public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { + this.enable_remove_none_reader = enable_remove_none_reader; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index 54f8315..8c05b85 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -32,7 +32,7 @@ * @param stream * @return */ - StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks); + StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks); /** * 鏍规嵁搴旂敤鍚嶅拰娴両D鑾峰彇鎾斁鍦板潃, 鍙槸鍦板潃鎷兼帴锛岃繑鍥炵殑ip浣跨敤杩滅▼璁块棶ip锛岄�傜敤涓巣lm涓巜vp鍦ㄤ竴鍙颁富鏈虹殑鎯呭喌 @@ -40,5 +40,5 @@ * @param stream * @return */ - StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr); + StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 899da98..94e7d69 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; @@ -32,4 +33,6 @@ * @return */ PageInfo<StreamPushItem> getPushList(Integer page, Integer count); + + StreamPushItem transform(MediaItem item); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index a261d24..9e5221b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -31,7 +32,7 @@ @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) { + public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) { return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null); } @@ -69,7 +70,7 @@ } @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) { + public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) { StreamInfo streamInfoResult = new StreamInfo(); streamInfoResult.setStreamId(stream); streamInfoResult.setApp(app); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index aabf35f..2820721 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -51,33 +51,38 @@ for (MediaItem item : mediaItems) { // 涓嶄繚瀛樺浗鏍囨帹鐞嗕互鍙婃媺娴佷唬鐞嗙殑娴� - if (item.getOriginType() == 3 || item.getOriginType() == 4 || item.getOriginType() == 5) { - continue; + if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) { + String key = item.getApp() + "_" + item.getStream(); + StreamPushItem streamPushItem = result.get(key); + if (streamPushItem == null) { + streamPushItem = transform(item); + result.put(key, streamPushItem); + } } - String key = item.getApp() + "_" + item.getStream(); - StreamPushItem streamPushItem = result.get(key); - if (streamPushItem == null) { - streamPushItem = new StreamPushItem(); - streamPushItem.setApp(item.getApp()); - streamPushItem.setMediaServerId(mediaServerItem.getId()); - streamPushItem.setStream(item.getStream()); - streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); - streamPushItem.setOriginSock(item.getOriginSock()); - streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); - streamPushItem.setOriginType(item.getOriginType()); - streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); - streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); - streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setStatus(true); - streamPushItem.setVhost(item.getVhost()); - result.put(key, streamPushItem); - } + } return new ArrayList<>(result.values()); } + @Override + public StreamPushItem transform(MediaItem item) { + StreamPushItem streamPushItem = new StreamPushItem(); + streamPushItem.setApp(item.getApp()); + streamPushItem.setMediaServerId(item.getMediaServerId()); + streamPushItem.setStream(item.getStream()); + streamPushItem.setAliveSecond(item.getAliveSecond()); + streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setOriginSock(item.getOriginSock()); + streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); + streamPushItem.setOriginType(item.getOriginType()); + streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); + streamPushItem.setOriginUrl(item.getOriginUrl()); + streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setAliveSecond(item.getAliveSecond()); + streamPushItem.setStatus(true); + streamPushItem.setVhost(item.getVhost()); + return streamPushItem; + } @Override public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 69bcd84..25743e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -135,7 +135,7 @@ * @param app * @param streamId */ - void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo); + void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo); /** * 绉婚櫎娴佷俊鎭粠redis @@ -143,5 +143,5 @@ * @param app * @param streamId */ - void removeStream(MediaServerItem mediaServerItem, String app, String streamId); + void removePushStream(MediaServerItem mediaServerItem, String app, String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 931530a..3f4b73f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -353,7 +353,7 @@ * @param app * @param stream */ - void removeMedia(String app, String stream); + int removeMedia(String app, String stream); /** @@ -366,7 +366,7 @@ * @param app * @param streamId */ - void mediaOutline(String app, String streamId); + int mediaOutline(String app, String streamId); /** * 璁剧疆骞冲彴鍦ㄧ嚎/绂荤嚎 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 230afbc..1fdc6d8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -53,7 +53,7 @@ @Update("UPDATE gb_stream " + "SET status=${status} " + "WHERE app=#{app} AND stream=#{stream}") - void setStatus(String app, String stream, boolean status); + int setStatus(String app, String stream, boolean status); @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ") List<GbStream> selectAllByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0d0ede7..0d5b98d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -308,13 +308,13 @@ } @Override - public void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) { + public void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) { String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.set(key, streamInfo); } @Override - public void removeStream(MediaServerItem mediaServerItem, String app, String streamId) { + public void removePushStream(MediaServerItem mediaServerItem, String app, String streamId) { String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.del(key); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 0e24942..37905ec 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -605,8 +605,8 @@ } @Override - public void removeMedia(String app, String stream) { - streamPushMapper.del(app, stream); + public int removeMedia(String app, String stream) { + return streamPushMapper.del(app, stream); } @Override @@ -615,8 +615,8 @@ } @Override - public void mediaOutline(String app, String streamId) { - gbStreamMapper.setStatus(app, streamId, false); + public int mediaOutline(String app, String streamId) { + return gbStreamMapper.setStatus(app, streamId, false); } @Override -- Gitblit v1.8.0