648540858
2021-11-24 f61051c46361c4863faf73db81062de0889900d4
优化streamchannge hook以及对推流的识别
14个文件已修改
195 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -30,7 +30,7 @@
    private String rtsps;
    private String rtc;
    private String mediaServerId;
    private JSONArray tracks;
    private Object tracks;
    public static class TransactionInfo{
        public String callId;
@@ -105,11 +105,11 @@
        this.rtsp = rtsp;
    }
    public JSONArray getTracks() {
    public Object getTracks() {
        return tracks;
    }
    public void setTracks(JSONArray tracks) {
    public void setTracks(Object tracks) {
        this.tracks = tracks;
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -3,11 +3,13 @@
import java.util.List;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
@@ -258,12 +260,13 @@
     */
    @ResponseBody
    @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
    public ResponseEntity<String> onStreamChanged(@RequestBody JSONObject json){
    public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){
        
        if (logger.isDebugEnabled()) {
            logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + json.toString());
            logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
        }
        String mediaServerId = json.getString("mediaServerId");
        String mediaServerId = item.getMediaServerId();
        JSONObject json = (JSONObject) JSON.toJSON(item);
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
        if (subscribe != null ) {
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -272,13 +275,12 @@
            }
        }
        // 流消失移除redis play
        String app = json.getString("app");
        String streamId = json.getString("stream");
        String schema = json.getString("schema");
        JSONArray tracks = json.getJSONArray("tracks");
        boolean regist = json.getBoolean("regist");
        String app = item.getApp();
        String streamId = item.getStream();
        String schema = item.getSchema();
        List<MediaItem.MediaTrack> tracks = item.getTracks();
        boolean regist = item.isRegist();
        if (tracks != null) {
            logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema);
        }
@@ -298,24 +300,34 @@
                    redisCatchStorage.stopPlayback(streamInfo);
                }
            }else {
                if (!"rtp".equals(app) ){
                    // 发送流变化redis消息
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("serverId", userSetup.getServerId());
                    jsonObject.put("app", app);
                    jsonObject.put("stream", streamId);
                    jsonObject.put("register", regist);
                    jsonObject.put("mediaServerId", mediaServerId);
                    redisCatchStorage.sendStreamChangeMsg(jsonObject);
                if (!"rtp".equals(app)){
                    boolean pushChange = false;
                    MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
                    if (regist) {
                        zlmMediaListManager.addMedia(mediaServerItem, app, streamId);
                        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
                        redisCatchStorage.addStream(mediaServerItem, app, streamId, streamInfo);
                        if ((item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8)) {
                            pushChange = true;
                            zlmMediaListManager.addMedia(item);
                            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
                            redisCatchStorage.addPushStream(mediaServerItem, app, streamId, streamInfo);
                        }
                    }else {
                        zlmMediaListManager.removeMedia( app, streamId);
                        redisCatchStorage.removeStream(mediaServerItem, app, streamId);
                        int result = zlmMediaListManager.removeMedia( app, streamId);
                        redisCatchStorage.removePushStream(mediaServerItem, app, streamId);
                        if (result > 0) {
                            pushChange = true;
                        }
                    }
                    if(pushChange) {
                        // 发送流变化redis消息
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("serverId", userSetup.getServerId());
                        jsonObject.put("app", app);
                        jsonObject.put("stream", streamId);
                        jsonObject.put("register", regist);
                        jsonObject.put("mediaServerId", mediaServerId);
                        redisCatchStorage.sendStreamChangeMsg(jsonObject);
                    }
                }
            }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@@ -87,6 +88,10 @@
        updateMedia(mediaServerItem, app, streamId);
    }
    public void addMedia(MediaItem mediaItem) {
        storager.updateMedia(streamPushService.transform(mediaItem));
    }
    public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
        //使用异步更新推流
@@ -113,14 +118,16 @@
    }
    public void removeMedia(String app, String streamId) {
    public int removeMedia(String app, String streamId) {
        // 查找是否关联了国标, 关联了不删除, 置为离线
        StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
        int result = 0;
        if (streamProxyItem == null) {
            storager.removeMedia(app, streamId);
            result = storager.removeMedia(app, streamId);
        }else {
            storager.mediaOutline(app, streamId);
            result =storager.mediaOutline(app, streamId);
        }
        return result;
    }
//    public void clearAllSessions() {
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
@@ -5,6 +5,11 @@
public class MediaItem {
    /**
     * 注册/注销
     */
    private boolean regist;
    /**
     * 应用名
     */
    private String app;
@@ -54,6 +59,11 @@
    private String originUrl;
    /**
     * 服务器id
     */
    private String mediaServerId;
    /**
     * GMT unix系统时间戳,单位秒
     */
    private Long createStamp;
@@ -77,6 +87,14 @@
     * 音视频轨道
     */
    private String vhost;
    public boolean isRegist() {
        return regist;
    }
    public void setRegist(boolean regist) {
        this.regist = regist;
    }
    /**
     * 是否是docker部署, docker部署不会自动更新zlm使用的端口,需要自己手动修改
@@ -376,4 +394,12 @@
    public void setDocker(boolean docker) {
        this.docker = docker;
    }
    public String getMediaServerId() {
        return mediaServerId;
    }
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java
@@ -17,6 +17,7 @@
    private boolean enable;
    private boolean enable_hls;
    private boolean enable_mp4;
    private boolean enable_remove_none_reader; // 无人观看时删除
    private String platformGbId;
    private String createTime;
@@ -142,4 +143,12 @@
    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }
    public boolean isEnable_remove_none_reader() {
        return enable_remove_none_reader;
    }
    public void setEnable_remove_none_reader(boolean enable_remove_none_reader) {
        this.enable_remove_none_reader = enable_remove_none_reader;
    }
}
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -32,7 +32,7 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks);
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks);
    /**
     * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况
@@ -40,5 +40,5 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr);
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr);
}
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
@@ -32,4 +33,6 @@
     * @return
     */
    PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
    StreamPushItem transform(MediaItem item);
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -5,6 +5,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -31,7 +32,7 @@
    @Override
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) {
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) {
        return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
    }
@@ -69,7 +70,7 @@
    }
    @Override
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) {
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) {
        StreamInfo streamInfoResult = new StreamInfo();
        streamInfoResult.setStreamId(stream);
        streamInfoResult.setApp(app);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -51,33 +51,38 @@
        for (MediaItem item : mediaItems) {
            // 不保存国标推理以及拉流代理的流
            if (item.getOriginType() == 3 || item.getOriginType() == 4 || item.getOriginType() == 5) {
                continue;
            if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
                String key = item.getApp() + "_" + item.getStream();
                StreamPushItem streamPushItem = result.get(key);
                if (streamPushItem == null) {
                    streamPushItem = transform(item);
                    result.put(key, streamPushItem);
                }
            }
            String key = item.getApp() + "_" + item.getStream();
            StreamPushItem streamPushItem = result.get(key);
            if (streamPushItem == null) {
                streamPushItem = new StreamPushItem();
                streamPushItem.setApp(item.getApp());
                streamPushItem.setMediaServerId(mediaServerItem.getId());
                streamPushItem.setStream(item.getStream());
                streamPushItem.setAliveSecond(item.getAliveSecond());
                streamPushItem.setCreateStamp(item.getCreateStamp());
                streamPushItem.setOriginSock(item.getOriginSock());
                streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
                streamPushItem.setOriginType(item.getOriginType());
                streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
                streamPushItem.setOriginUrl(item.getOriginUrl());
                streamPushItem.setCreateStamp(item.getCreateStamp());
                streamPushItem.setAliveSecond(item.getAliveSecond());
                streamPushItem.setStatus(true);
                streamPushItem.setVhost(item.getVhost());
                result.put(key, streamPushItem);
            }
        }
        return new ArrayList<>(result.values());
    }
    @Override
    public StreamPushItem transform(MediaItem item) {
        StreamPushItem streamPushItem = new StreamPushItem();
        streamPushItem.setApp(item.getApp());
        streamPushItem.setMediaServerId(item.getMediaServerId());
        streamPushItem.setStream(item.getStream());
        streamPushItem.setAliveSecond(item.getAliveSecond());
        streamPushItem.setCreateStamp(item.getCreateStamp());
        streamPushItem.setOriginSock(item.getOriginSock());
        streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
        streamPushItem.setOriginType(item.getOriginType());
        streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
        streamPushItem.setOriginUrl(item.getOriginUrl());
        streamPushItem.setCreateStamp(item.getCreateStamp());
        streamPushItem.setAliveSecond(item.getAliveSecond());
        streamPushItem.setStatus(true);
        streamPushItem.setVhost(item.getVhost());
        return streamPushItem;
    }
    @Override
    public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -135,7 +135,7 @@
     * @param app
     * @param streamId
     */
    void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
    void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
    /**
     * 移除流信息从redis
@@ -143,5 +143,5 @@
     * @param app
     * @param streamId
     */
    void removeStream(MediaServerItem mediaServerItem, String app, String streamId);
    void removePushStream(MediaServerItem mediaServerItem, String app, String streamId);
}
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -353,7 +353,7 @@
     * @param app
     * @param stream
     */
    void removeMedia(String app, String stream);
    int removeMedia(String app, String stream);
    /**
@@ -366,7 +366,7 @@
     * @param app
     * @param streamId
     */
    void mediaOutline(String app, String streamId);
    int mediaOutline(String app, String streamId);
    /**
     * 设置平台在线/离线
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -53,7 +53,7 @@
    @Update("UPDATE gb_stream " +
            "SET status=${status} " +
            "WHERE app=#{app} AND stream=#{stream}")
    void setStatus(String app, String stream, boolean status);
    int setStatus(String app, String stream, boolean status);
    @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN  platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
    List<GbStream> selectAllByMediaServerId(String mediaServerId);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -308,13 +308,13 @@
    }
    @Override
    public void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) {
    public void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) {
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
        redis.set(key, streamInfo);
    }
    @Override
    public void removeStream(MediaServerItem mediaServerItem, String app, String streamId) {
    public void removePushStream(MediaServerItem mediaServerItem, String app, String streamId) {
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
        redis.del(key);
    }
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -605,8 +605,8 @@
    }
    @Override
    public void removeMedia(String app, String stream) {
        streamPushMapper.del(app, stream);
    public int removeMedia(String app, String stream) {
        return streamPushMapper.del(app, stream);
    }
    @Override
@@ -615,8 +615,8 @@
    }
    @Override
    public void mediaOutline(String app, String streamId) {
        gbStreamMapper.setStatus(app, streamId, false);
    public int mediaOutline(String app, String streamId) {
        return gbStreamMapper.setStatus(app, streamId, false);
    }
    @Override