648540858
2022-07-11 afbec289067cc7f284dd135366b0f6febf13126b
增加推流鉴权。保护服务安全
26个文件已修改
6个文件已添加
973 ■■■■ 已修改文件
sql/update.sql 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 142 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookParam.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPlayHookParam.java 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPublishHookParam.java 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java 114 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IUserService.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/PushStreamStatusChangeFromRedisDto.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/StreamPushItemFromRedis.java 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/UserServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/dao/UserMapper.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
web_src/src/layout/UiHeader.vue 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sql/update.sql
@@ -52,10 +52,14 @@
alter table stream_push
    add pushTime varchar(50) default null;
alter table stream_push
    add status int DEFAULT NULL;
alter table stream_push
    add updateTime varchar(50) default null;
alter table stream_push
    change createStamp createTime varchar(50) default null;
alter table gb_stream
    drop column status;
alter table user
    add pushKey varchar(50) default null;
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -9,6 +9,9 @@
    private String deviceID;
    private String channelId;
    private String flv;
    private String ip;
    private String https_flv;
    private String ws_flv;
    private String wss_flv;
@@ -292,4 +295,12 @@
    public void setProgress(double progress) {
        this.progress = progress;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
}
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -58,6 +58,8 @@
    public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
    public static final String MEDIA_STREAM_AUTHORITY = "MEDIA_STREAM_AUTHORITY_";
    public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
    public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
@@ -71,6 +73,8 @@
    public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_";
    //************************** redis 消息*********************************
    // 流变化的通知
@@ -79,9 +83,15 @@
    // 接收推流设备的GPS变化通知
    public static final String VM_MSG_GPS = "VM_MSG_GPS";
    // 接收推流设备的GPS变化通知
    public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
    // redis 消息通知设备推流到平台
    public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
    // redis 消息请求所有的在线通道
    public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED";
    // 移动位置订阅通知
    public static final String VM_MSG_SUBSCRIBE_MOBILE_POSITION = "mobileposition";
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -1,7 +1,8 @@
package com.genersoft.iot.vmp.media.zlm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.common.StreamInfo;
@@ -21,6 +22,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -78,6 +80,9 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private IUserService userService;
    @Autowired
    private VideoStreamSessionManager sessionManager;
@@ -151,12 +156,14 @@
     */
    @ResponseBody
    @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
    public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
    public ResponseEntity<String> onPlay(@RequestBody OnPlayHookParam param){
        JSONObject json = (JSONObject)JSON.toJSON(param);
        if (logger.isDebugEnabled()) {
            logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
            logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
        }
        String mediaServerId = json.getString("mediaServerId");
        String mediaServerId = param.getMediaServerId();
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
        if (subscribe != null ) {
            MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -165,9 +172,20 @@
            }
        }
        JSONObject ret = new JSONObject();
        if (!"rtp".equals(param.getApp())) {
            Map<String, String> paramMap = urlParamToMap(param.getParams());
            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
            if (streamAuthorityInfo == null
                    || (streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(paramMap.get("callId")))) {
                ret.put("code", 401);
                ret.put("msg", "Unauthorized");
                return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
            }
        }
        ret.put("code", 0);
        ret.put("msg", "success");
        return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
        return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
    }
    
    /**
@@ -176,16 +194,49 @@
     */
    @ResponseBody
    @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
    public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
    public ResponseEntity<String> onPublish(@RequestBody OnPublishHookParam param) {
        JSONObject json = (JSONObject) JSON.toJSON(param);
        logger.info("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
        JSONObject ret = new JSONObject();
        if (!"rtp".equals(param.getApp())) {
            // 推流鉴权
            if (param.getParams() == null) {
                logger.info("推流鉴权失败: 缺少不要参数:sign=md5(user表的pushKey)");
                ret.put("code", 401);
                ret.put("msg", "Unauthorized");
                return new ResponseEntity<>(ret.toString(), HttpStatus.OK);
            }
            Map<String, String> paramMap = urlParamToMap(param.getParams());
            String sign = paramMap.get("sign");
            if (sign == null) {
                logger.info("推流鉴权失败: 缺少不要参数:sign=md5(user表的pushKey)");
                ret.put("code", 401);
                ret.put("msg", "Unauthorized");
                return new ResponseEntity<>(ret.toString(), HttpStatus.OK);
            }
            // 推流自定义播放鉴权码
            String callId = paramMap.get("callId");
            // 鉴权配置
            boolean hasAuthority = userService.checkPushAuthority(callId, sign);
            if (!hasAuthority) {
                logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
                ret.put("code", 401);
                ret.put("msg", "Unauthorized");
                return new ResponseEntity<>(ret.toString(), HttpStatus.OK);
            }
            StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
            streamAuthorityInfo.setCallId(callId);
            streamAuthorityInfo.setSign(sign);
            // 鉴权通过
            redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
        }
        ret.put("code", 0);
        ret.put("msg", "success");
        ret.put("enable_hls", true);
        if (json.getInteger("originType") == 1
                || json.getInteger("originType") == 2
                || json.getInteger("originType") == 3) {
        if (!"rtp".equals(param.getApp())) {
            ret.put("enable_audio", true);
        }
@@ -200,14 +251,13 @@
                ret.put("msg", "zlm not register");
            }
        }
         String app = json.getString("app");
         String stream = json.getString("stream");
        if ("rtp".equals(app)) {
        if ("rtp".equals(param.getApp())) {
            ret.put("enable_mp4", userSetting.getRecordSip());
        }else {
            ret.put("enable_mp4", userSetting.isRecordPushLive());
        }
        List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
        List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream());
        if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
            String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
            String channelId = ssrcTransactionForAll.get(0).getChannelId();
@@ -221,13 +271,14 @@
                ret.put("enable_mp4", true);
                ret.put("enable_audio", true);
            }
        }
        return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
    }
    /**
     * 录制mp4完成后通知事件;此事件对回复不敏感。
     *  
@@ -312,9 +363,6 @@
        if (logger.isDebugEnabled()) {
            logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
        }
        // TODO 如果是带有rtpstream则开启按需拉流
        // String app = json.getString("app");
        // String stream = json.getString("stream");
        String mediaServerId = json.getString("mediaServerId");
        ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
        if (subscribe != null ) {
@@ -351,12 +399,24 @@
        }
        // 流消失移除redis play
        String app = item.getApp();
        String streamId = item.getStream();
        String stream = item.getStream();
        String schema = item.getSchema();
        List<MediaItem.MediaTrack> tracks = item.getTracks();
        boolean regist = item.isRegist();
        if (regist) {
            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
            if (streamAuthorityInfo == null) {
                streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
            }else {
                streamAuthorityInfo.setOriginType(item.getOriginType());
                streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
            }
            redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
        }else {
            redisCatchStorage.removeStreamAuthorityInfo(app, stream);
        }
        if ("rtmp".equals(schema)){
            logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, streamId);
            logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, stream);
            if (regist) {
                mediaServerService.addCount(mediaServerId);
            }else {
@@ -365,15 +425,15 @@
            if (item.getOriginType() == OriginType.PULL.ordinal()
                    || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) {
                // 设置拉流代理上线/离线
                streamProxyService.updateStatus(regist, app, streamId);
                streamProxyService.updateStatus(regist, app, stream);
            }
            if ("rtp".equals(app) && !regist ) {
                StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
                StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(stream);
                if (streamInfo!=null){
                    redisCatchStorage.stopPlay(streamInfo);
                    storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                }else{
                    streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
                    streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null);
                    if (streamInfo != null) {
                        redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
                                streamInfo.getStream(), null);
@@ -387,10 +447,12 @@
                    if (mediaServerItem != null){
                        if (regist) {
                            StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
                            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
                            StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem,
                                    app, stream, tracks, streamAuthorityInfo.getCallId());
                            item.setStreamInfo(streamInfoByAppAndStream);
                            redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
                            redisCatchStorage.addStream(mediaServerItem, type, app, stream, item);
                            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
@@ -413,23 +475,23 @@
                        }else {
                            // 兼容流注销时类型从redis记录获取
                            MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
                            MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, stream, mediaServerId);
                            if (mediaItem != null) {
                                type = OriginType.values()[mediaItem.getOriginType()].getType();
                                redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
                                redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, stream);
                            }
                            GbStream gbStream = storager.getGbStream(app, streamId);
                            GbStream gbStream = storager.getGbStream(app, stream);
                            if (gbStream != null) {
//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
                            }
                            zlmMediaListManager.removeMedia(app, streamId);
                            zlmMediaListManager.removeMedia(app, stream);
                        }
                        if (type != null) {
                            // 发送流变化redis消息
                            JSONObject jsonObject = new JSONObject();
                            jsonObject.put("serverId", userSetting.getServerId());
                            jsonObject.put("app", app);
                            jsonObject.put("stream", streamId);
                            jsonObject.put("stream", stream);
                            jsonObject.put("register", regist);
                            jsonObject.put("mediaServerId", mediaServerId);
                            redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
@@ -565,4 +627,22 @@
        ret.put("msg", "success");
        return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
    }
    private Map<String, String> urlParamToMap(String params) {
        HashMap<String, String> map = new HashMap<>();
        if (StringUtils.isEmpty(params)) {
            return map;
        }
        String[] paramsArray = params.split("&");
        if (paramsArray.length == 0) {
            return map;
        }
        for (String param : paramsArray) {
            String[] paramArray = param.split("=");
            if (paramArray.length == 2){
                map.put(paramArray[0], paramArray[1]);
            }
        }
        return map;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -115,64 +115,42 @@
    public StreamPushItem addPush(MediaItem mediaItem) {
        // 查找此直播流是否存在redis预设gbId
        StreamPushItem transform = streamPushService.transform(mediaItem);
        // 从streamId取出查询关键值
        Pattern pattern = Pattern.compile(userSetting.getThirdPartyGBIdReg());
        Matcher matcher = pattern.matcher(mediaItem.getStream());// 指定要匹配的字符串
        String queryKey = null;
        if (matcher.find()) { //此处find()每次被调用后,会偏移到下一个匹配
            queryKey = matcher.group();
        }
        if (queryKey != null) {
            ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey);
            if (thirdPartyGB != null && !StringUtils.isEmpty(thirdPartyGB.getNationalStandardNo())) {
                transform.setGbId(thirdPartyGB.getNationalStandardNo());
                transform.setName(thirdPartyGB.getName());
            }
        }
        if (!StringUtils.isEmpty(transform.getGbId())) {
            // 如果这个国标ID已经给了其他推流且流已离线,则移除其他推流
            List<GbStream> gbStreams = gbStreamMapper.selectByGBId(transform.getGbId());
            if (gbStreams.size() > 0) {
                for (GbStream gbStream : gbStreams) {
                    // 出现使用相同国标Id的视频流时,使用新流替换旧流,
                    if (queryKey != null && gbStream.getApp().equals(mediaItem.getApp())) {
                        Matcher matcherForStream = pattern.matcher(gbStream.getStream());
                        String queryKeyForStream = null;
                        if (matcherForStream.find()) { //此处find()每次被调用后,会偏移到下一个匹配
                            queryKeyForStream = matcherForStream.group();
                        }
                        if (queryKeyForStream == null || !queryKeyForStream.equals(queryKey)) {
                            // 此时不是同一个流
                            gbStreamMapper.del(gbStream.getApp(), gbStream.getStream());
                            if (!gbStream.isStatus()) {
                                streamPushMapper.del(gbStream.getApp(), gbStream.getStream());
                            }
                        }
                    }
                }
            }
            List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId());
            if (gbStreamList != null && gbStreamList.size() == 1) {
                transform.setGbStreamId(gbStreamList.get(0).getGbStreamId());
                transform.setPlatformId(gbStreamList.get(0).getPlatformId());
                transform.setCatalogId(gbStreamList.get(0).getCatalogId());
                transform.setGbId(gbStreamList.get(0).getGbId());
                gbStreamMapper.update(transform);
                streamPushMapper.del(gbStreamList.get(0).getApp(), gbStreamList.get(0).getStream());
            }else {
                transform.setCreateTime(DateUtil.getNow());
                transform.setUpdateTime(DateUtil.getNow());
                gbStreamMapper.add(transform);
            }
            if (transform != null) {
                if (channelOnlineEvents.get(transform.getGbId()) != null)  {
                    channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId());
                    channelOnlineEvents.remove(transform.getGbId());
                }
            }
        StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream());
        transform.setUpdateTime(DateUtil.getNow());
        transform.setPushTime(DateUtil.getNow());
        if (pushInDb == null) {
            transform.setCreateTime(DateUtil.getNow());
            streamPushMapper.add(transform);
        }else {
            streamPushMapper.update(transform);
//            if (!StringUtils.isEmpty(pushInDb.getGbId())) {
//                List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId());
//                if (gbStreamList != null && gbStreamList.size() == 1) {
//                    transform.setGbStreamId(gbStreamList.get(0).getGbStreamId());
//                    transform.setPlatformId(gbStreamList.get(0).getPlatformId());
//                    transform.setCatalogId(gbStreamList.get(0).getCatalogId());
//                    transform.setGbId(gbStreamList.get(0).getGbId());
//                    gbStreamMapper.update(transform);
//                    streamPushMapper.del(gbStreamList.get(0).getApp(), gbStreamList.get(0).getStream());
//                }else {
//                    transform.setCreateTime(DateUtil.getNow());
//                    transform.setUpdateTime(DateUtil.getNow());
//                    gbStreamMapper.add(transform);
//                }
                // 通知通道上线
//            if (transform != null) {
//                if (channelOnlineEvents.get(transform.getGbId()) != null)  {
//                    channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId());
//                    channelOnlineEvents.remove(transform.getGbId());
//                }
//            }
//            }
        }
        storager.updateMedia(transform);
        return transform;
    }
@@ -206,13 +184,13 @@
    public int removeMedia(String app, String streamId) {
        // 查找是否关联了国标, 关联了不删除, 置为离线
        StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
        int result = 0;
        if (streamProxyItem == null) {
        GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
        int result;
        if (gbStream == null) {
            result = storager.removeMedia(app, streamId);
        }else {
            // TODO 暂不设置为离线
            result =storager.mediaOutline(app, streamId);
            result =storager.mediaOffline(app, streamId);
        }
        return result;
    }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -66,7 +66,7 @@
        String stream = UUID.randomUUID().toString();
        param.put("enable_tcp", 1);
        param.put("stream_id", stream);
        param.put("port", 0);
//        param.put("port", 0);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
        if (openRtpServerResultJson != null) {
@@ -101,9 +101,10 @@
        }
        Map<String, Object> param = new HashMap<>();
        // 推流端口设置0则使用随机端口
        param.put("enable_tcp", 1);
        param.put("stream_id", streamId);
        // 推流端口设置0则使用随机端口
        param.put("port", 0);
        param.put("ssrc", ssrc);
        JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookParam.java
New file
@@ -0,0 +1,17 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * zlm hook事件的参数
 * @author lin
 */
public class HookParam {
    private String mediaServerId;
    public String getMediaServerId() {
        return mediaServerId;
    }
    public void setMediaServerId(String mediaServerId) {
        this.mediaServerId = mediaServerId;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPlayHookParam.java
New file
@@ -0,0 +1,82 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * zlm hook事件中的on_play事件的参数
 * @author lin
 */
public class OnPlayHookParam extends HookParam{
    private String id;
    private String app;
    private String stream;
    private String ip;
    private String params;
    private int port;
    private String schema;
    private String vhost;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getParams() {
        return params;
    }
    public void setParams(String params) {
        this.params = params;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getSchema() {
        return schema;
    }
    public void setSchema(String schema) {
        this.schema = schema;
    }
    public String getVhost() {
        return vhost;
    }
    public void setVhost(String vhost) {
        this.vhost = vhost;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OnPublishHookParam.java
New file
@@ -0,0 +1,82 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * zlm hook事件中的on_publish事件的参数
 * @author lin
 */
public class OnPublishHookParam extends HookParam{
    private String id;
    private String app;
    private String stream;
    private String ip;
    private String params;
    private int port;
    private String schema;
    private String vhost;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getParams() {
        return params;
    }
    public void setParams(String params) {
        this.params = params;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getSchema() {
        return schema;
    }
    public void setSchema(String schema) {
        this.schema = schema;
    }
    public String getVhost() {
        return vhost;
    }
    public void setVhost(String vhost) {
        this.vhost = vhost;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java
New file
@@ -0,0 +1,114 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
 * 流的鉴权信息
 * @author lin
 */
public class StreamAuthorityInfo {
    private String id;
    private String app;
    private String stream;
    /**
     * 产生源类型,
     * unknown = 0,
     * rtmp_push=1,
     * rtsp_push=2,
     * rtp_push=3,
     * pull=4,
     * ffmpeg_pull=5,
     * mp4_vod=6,
     * device_chn=7
     */
    private int originType;
    /**
     * 产生源类型的字符串描述
     */
    private String originTypeStr;
    /**
     * 推流时自定义的播放鉴权ID
     */
    private String callId;
    /**
     * 推流的鉴权签名
     */
    private String sign;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public int getOriginType() {
        return originType;
    }
    public void setOriginType(int originType) {
        this.originType = originType;
    }
    public String getOriginTypeStr() {
        return originTypeStr;
    }
    public void setOriginTypeStr(String originTypeStr) {
        this.originTypeStr = originTypeStr;
    }
    public String getCallId() {
        return callId;
    }
    public void setCallId(String callId) {
        this.callId = callId;
    }
    public String getSign() {
        return sign;
    }
    public void setSign(String sign) {
        this.sign = sign;
    }
    public static StreamAuthorityInfo getInstanceByHook(OnPublishHookParam hookParam) {
        StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
        streamAuthorityInfo.setApp(hookParam.getApp());
        streamAuthorityInfo.setStream(hookParam.getStream());
        streamAuthorityInfo.setId(hookParam.getId());
        return streamAuthorityInfo;
    }
    public static StreamAuthorityInfo getInstanceByHook(MediaItem mediaItem) {
        StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
        streamAuthorityInfo.setApp(mediaItem.getApp());
        streamAuthorityInfo.setStream(mediaItem.getStream());
        streamAuthorityInfo.setId(mediaItem.getMediaServerId());
        streamAuthorityInfo.setOriginType(mediaItem.getOriginType());
        streamAuthorityInfo.setOriginTypeStr(mediaItem.getOriginTypeStr());
        return streamAuthorityInfo;
    }
}
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -15,7 +15,7 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId,String addr);
    StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId,String addr, boolean authority);
    /**
@@ -24,7 +24,7 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId);
    StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority);
    /**
     * 根据应用名和流ID获取播放地址, 只是地址拼接
@@ -32,7 +32,7 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks);
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks, String callId);
    /**
     * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况
@@ -40,5 +40,5 @@
     * @param stream
     * @return
     */
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr);
    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId);
}
src/main/java/com/genersoft/iot/vmp/service/IUserService.java
@@ -19,4 +19,6 @@
    List<User> getAllUsers();
    int updateUsers(User user);
    boolean checkPushAuthority(String callId, String sign);
}
src/main/java/com/genersoft/iot/vmp/service/bean/PushStreamStatusChangeFromRedisDto.java
New file
@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.service.bean;
import java.util.List;
/**
 * 收到redis通知修改推流通道状态
 * @author lin
 */
public class PushStreamStatusChangeFromRedisDto {
    private boolean setAllOffline;
    private List<StreamPushItemFromRedis> onlineStreams;
    private List<StreamPushItemFromRedis> offlineStreams;
    public boolean isSetAllOffline() {
        return setAllOffline;
    }
    public void setSetAllOffline(boolean setAllOffline) {
        this.setAllOffline = setAllOffline;
    }
    public List<StreamPushItemFromRedis> getOnlineStreams() {
        return onlineStreams;
    }
    public void setOnlineStreams(List<StreamPushItemFromRedis> onlineStreams) {
        this.onlineStreams = onlineStreams;
    }
    public List<StreamPushItemFromRedis> getOfflineStreams() {
        return offlineStreams;
    }
    public void setOfflineStreams(List<StreamPushItemFromRedis> offlineStreams) {
        this.offlineStreams = offlineStreams;
    }
}
src/main/java/com/genersoft/iot/vmp/service/bean/StreamPushItemFromRedis.java
New file
@@ -0,0 +1,34 @@
package com.genersoft.iot.vmp.service.bean;
public class StreamPushItemFromRedis {
    private String app;
    private String stream;
    private long timeStamp;
    public String getApp() {
        return app;
    }
    public void setApp(String app) {
        this.app = app;
    }
    public String getStream() {
        return stream;
    }
    public void setStream(String stream) {
        this.stream = stream;
    }
    public long getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }
}
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
@@ -149,9 +149,9 @@
        if (gbStream.getGbId() != null) {
            gbStreams.add(gbStream);
        }else {
            StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(gbStream.getApp(), gbStream.getStream());
            if (streamProxyItem != null && streamProxyItem.getGbId() != null){
                gbStreams.add(streamProxyItem);
            GbStream gbStreamIndb  = gbStreamMapper.selectOne(gbStream.getApp(), gbStream.getStream());
            if (gbStreamIndb != null && gbStreamIndb.getGbId() != null){
                gbStreams.add(gbStreamIndb);
            }
        }
        sendCatalogMsgs(gbStreams, type);
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -7,12 +7,15 @@
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.service.IMediaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@Service
public class MediaServiceImpl implements IMediaService {
@@ -36,18 +39,22 @@
    @Override
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) {
        return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String callId) {
        return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null, callId);
    }
    @Override
    public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr) {
    public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr, boolean authority) {
        StreamInfo streamInfo = null;
        if (mediaServerId == null) {
            mediaServerId = mediaConfig.getId();
        }
        MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);;
        MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
        if (mediaInfo == null) {
            return null;
        }
        StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
        if (streamAuthorityInfo == null) {
            return null;
        }
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, app, stream);
@@ -59,7 +66,12 @@
                }
                JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
                JSONArray tracks = mediaJSON.getJSONArray("tracks");
                streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks);
                if (authority) {
                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId());
                }else {
                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
                }
            }
        }
        return streamInfo;
@@ -68,46 +80,48 @@
    @Override
    public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId) {
        return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null);
    public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, boolean authority) {
        return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null, authority);
    }
    @Override
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) {
    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId) {
        StreamInfo streamInfoResult = new StreamInfo();
        streamInfoResult.setStream(stream);
        streamInfoResult.setApp(app);
        if (addr == null) {
            addr = mediaInfo.getStreamIp();
        }
        streamInfoResult.setIp(addr);
        streamInfoResult.setMediaServerId(mediaInfo.getId());
        streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s", addr, mediaInfo.getRtmpPort(), app,  stream));
        String callIdParam = StringUtils.isEmpty(callId)?"":"?callId=" + callId;
        streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s%s", addr, mediaInfo.getRtmpPort(), app,  stream, callIdParam));
        if (mediaInfo.getRtmpSSlPort() != 0) {
            streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s", addr, mediaInfo.getRtmpSSlPort(), app,  stream));
            streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s%s", addr, mediaInfo.getRtmpSSlPort(), app,  stream, callIdParam));
        }
        streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app,  stream));
        streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s%s", addr, mediaInfo.getRtspPort(), app,  stream, callIdParam));
        if (mediaInfo.getRtspSSLPort() != 0) {
            streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app,  stream));
            streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s%s", addr, mediaInfo.getRtspSSLPort(), app,  stream, callIdParam));
        }
        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setTs(String.format("http://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setWs_ts(String.format("ws://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app,  stream));
        streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.live.flv%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.live.flv%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/%s/%s.live.mp4%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setTs(String.format("http://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        streamInfoResult.setWs_ts(String.format("ws://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpPort(), app,  stream, callIdParam));
        if (mediaInfo.getHttpSSlPort() != 0) {
            streamInfoResult.setHttps_flv(String.format("https://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setWss_flv(String.format("wss://%s:%s/%s/%s.live.flv", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setHttps_hls(String.format("https://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setWss_hls(String.format("wss://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setHttps_fmp4(String.format("https://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setWss_fmp4(String.format("wss://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app,  stream));
            streamInfoResult.setHttps_flv(String.format("https://%s:%s/%s/%s.live.flv%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setWss_flv(String.format("wss://%s:%s/%s/%s.live.flv%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setHttps_hls(String.format("https://%s:%s/%s/%s/hls.m3u8%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setWss_hls(String.format("wss://%s:%s/%s/%s/hls.m3u8%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setHttps_fmp4(String.format("https://%s:%s/%s/%s.live.mp4%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setWss_fmp4(String.format("wss://%s:%s/%s/%s.live.mp4%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app,  stream, callIdParam));
            streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play%s", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app,  stream, StringUtils.isEmpty(callId)?"":"&callId=" + callId));
        }
        streamInfoResult.setTracks(tracks);
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -620,7 +620,7 @@
    public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
        String streamId = resonse.getString("stream");
        JSONArray tracks = resonse.getJSONArray("tracks");
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks, null);
        streamInfo.setDeviceID(deviceId);
        streamInfo.setChannelId(channelId);
        return streamInfo;
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
@@ -25,9 +25,6 @@
    @Override
    public void onMessage(@NotNull Message message, byte[] bytes) {
        if (logger.isDebugEnabled()) {
            logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody()));
        }
        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
    }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -319,7 +319,7 @@
        }
        streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
        // 其他的流设置离线
        streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId);
        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
        String type = "PULL";
        // 发送redis消息
@@ -346,7 +346,7 @@
    @Override
    public int updateStatus(boolean status, String app, String stream) {
        return streamProxyMapper.updateStatus(status, app, stream);
        return streamProxyMapper.updateStatus(app, stream, status);
    }
    private void syncPullStream(String mediaServerId){
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -39,6 +39,9 @@
    private StreamPushMapper streamPushMapper;
    @Autowired
    private StreamProxyMapper streamProxyMapper;
    @Autowired
    private ParentPlatformMapper parentPlatformMapper;
    @Autowired
@@ -285,7 +288,8 @@
        streamPushMapper.deleteWithoutGBId(mediaServerId);
        gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
        // 其他的流设置未启用
        gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
        streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
        streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
        // 发送流停止消息
        String type = "PUSH";
        // 发送redis消息
src/main/java/com/genersoft/iot/vmp/service/impl/UserServiceImpl.java
@@ -5,6 +5,7 @@
import com.genersoft.iot.vmp.storager.dao.dto.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.List;
@@ -55,4 +56,12 @@
    }
    @Override
    public boolean checkPushAuthority(String callId, String sign) {
        if (StringUtils.isEmpty(callId)) {
            return userMapper.checkPushAuthorityByCallId(sign).size() > 0;
        }else {
            return userMapper.checkPushAuthorityByCallIdAndSign(callId, sign).size() > 0;
        }
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -3,9 +3,7 @@
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -213,4 +211,26 @@
     */
    public boolean deviceIsOnline(String deviceId);
    /**
     * 存储推流的鉴权信息
     * @param app 应用名
     * @param stream 流
     * @param streamAuthorityInfo 鉴权信息
     */
    void updateStreamAuthorityInfo(String app, String stream, StreamAuthorityInfo streamAuthorityInfo);
    /**
     * 移除推流的鉴权信息
     * @param app 应用名
     * @param streamId 流
     */
    void removeStreamAuthorityInfo(String app, String streamId);
    /**
     * 获取推流的鉴权信息
     * @param app 应用名
     * @param stream 流
     * @return
     */
    StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream);
}
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
@@ -372,14 +372,16 @@
    /**
     * 设置流离线
     * @param app
     * @param streamId
     */
    int mediaOutline(String app, String streamId);
    int mediaOffline(String app, String streamId);
    /**
     * 设置流上线
     */
    int mediaOnline(String app, String streamId);
    /**
     * 设置平台在线/离线
     * @param online
     */
    void updateParentPlatformStatus(String platformGbID, boolean online);
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -68,7 +68,7 @@
    List<GbStream> selectAll(String platformId, String catalogId, String query, Boolean pushing, String mediaServerId);
    @Select("SELECT * FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
    StreamProxyItem selectOne(String app, String stream);
    GbStream selectOne(String app, String stream);
    @Select("SELECT * FROM gb_stream WHERE gbId=#{gbId}")
    List<GbStream> selectByGBId(String gbId);
@@ -87,16 +87,6 @@
    @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
            "ON gs.gbStreamId = pgs.gbStreamId WHERE pgs.gbStreamId is NULL")
    List<GbStream> queryStreamNotInPlatform();
    @Update("UPDATE gb_stream " +
            "SET status=${status} " +
            "WHERE app=#{app} AND stream=#{stream}")
    int setStatus(String app, String stream, boolean status);
    @Update("UPDATE gb_stream " +
            "SET status=${status} " +
            "WHERE mediaServerId=#{mediaServerId} ")
    void updateStatusByMediaServerId(String mediaServerId, boolean status);
    @Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}")
    void deleteWithoutGBId(String type, String mediaServerId);
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
@@ -62,12 +62,12 @@
    @Update("UPDATE stream_proxy " +
            "SET status=#{status} " +
            "WHERE mediaServerId=#{mediaServerId}")
    void updateStatusByMediaServerId(boolean status, String mediaServerId);
    void updateStatusByMediaServerId(String mediaServerId, boolean status);
    @Update("UPDATE stream_proxy " +
            "SET status=${status} " +
            "WHERE app=#{app} AND stream=#{stream}")
    int updateStatus(boolean status, String app, String stream);
    int updateStatus(String app, String stream, boolean status);
    @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}")
    void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -14,21 +14,23 @@
public interface StreamPushMapper {
    @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
            "createStamp, aliveSecond, mediaServerId, serverId) VALUES" +
            "pushTime, aliveSecond, mediaServerId, serverId, updateTime, createTime) VALUES" +
            "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
            "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )")
            "'${pushTime}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' , '${updateTime}' , '${createTime}' )")
    int add(StreamPushItem streamPushItem);
    @Update("UPDATE stream_push " +
            "SET app=#{app}," +
            "stream=#{stream}," +
            "mediaServerId=#{mediaServerId}," +
            "totalReaderCount=#{totalReaderCount}, " +
            "originType=#{originType}," +
            "originTypeStr=#{originTypeStr}, " +
            "createStamp=#{createStamp}, " +
            "aliveSecond=#{aliveSecond} " +
            "WHERE app=#{app} AND stream=#{stream}")
    @Update(value = {" <script>" +
            "UPDATE stream_push " +
            "SET updateTime='${updateTime}'" +
            "<if test=\"mediaServerId != null\">, mediaServerId='${mediaServerId}'</if>" +
            "<if test=\"totalReaderCount != null\">, totalReaderCount='${totalReaderCount}'</if>" +
            "<if test=\"originType != null\">, originType=${originType}</if>" +
            "<if test=\"originTypeStr != null\">, originTypeStr='${originTypeStr}'</if>" +
            "<if test=\"pushTime != null\">, pushTime='${pushTime}'</if>" +
            "<if test=\"aliveSecond != null\">, aliveSecond='${aliveSecond}'</if>" +
            "WHERE app=#{app} AND stream=#{stream}"+
            " </script>"})
    int update(StreamPushItem streamPushItem);
    @Delete("DELETE FROM stream_push WHERE app=#{app} AND stream=#{stream}")
@@ -62,7 +64,7 @@
    @Select(value = {" <script>" +
            "SELECT " +
            "st.*, " +
            "gs.gbId, gs.status, gs.name, gs.longitude, gs.latitude, gs.gbStreamId " +
            "gs.gbId, gs.name, gs.longitude, gs.latitude, gs.gbStreamId " +
            "from " +
            "stream_push st " +
            "LEFT JOIN gb_stream gs " +
@@ -70,25 +72,25 @@
            "WHERE " +
            "1=1 " +
            " <if test='query != null'> AND (st.app LIKE '%${query}%' OR st.stream LIKE '%${query}%' OR gs.gbId LIKE '%${query}%' OR gs.name LIKE '%${query}%')</if> " +
            " <if test='pushing == true' > AND (gs.gbId is null OR gs.status=1)</if>" +
            " <if test='pushing == false' > AND gs.status=0</if>" +
            " <if test='pushing == true' > AND (gs.gbId is null OR st.status=1)</if>" +
            " <if test='pushing == false' > AND st.status=0</if>" +
            " <if test='mediaServerId != null' > AND st.mediaServerId=#{mediaServerId} </if>" +
            "order by st.createStamp desc" +
            "order by st.createTime desc" +
            " </script>"})
    List<StreamPushItem> selectAllForList(String query, Boolean pushing, String mediaServerId);
    @Select("SELECT st.*, gs.gbId, gs.status, gs.name, gs.longitude, gs.latitude FROM stream_push st LEFT JOIN gb_stream gs on st.app = gs.app AND st.stream = gs.stream order by st.createStamp desc")
    @Select("SELECT st.*, gs.gbId, gs.name, gs.longitude, gs.latitude FROM stream_push st LEFT JOIN gb_stream gs on st.app = gs.app AND st.stream = gs.stream order by st.createTime desc")
    List<StreamPushItem> selectAll();
    @Select("SELECT st.*, gs.gbId, gs.status, gs.name, gs.longitude, gs.latitude FROM stream_push st LEFT JOIN gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
    @Select("SELECT st.*, gs.gbId, gs.name, gs.longitude, gs.latitude FROM stream_push st LEFT JOIN gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
    StreamPushItem selectOne(String app, String stream);
    @Insert("<script>"  +
            "Insert IGNORE INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
            "createStamp, aliveSecond, mediaServerId) " +
            "createTime, aliveSecond, mediaServerId) " +
            "VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" +
            "( '${item.app}', '${item.stream}', '${item.totalReaderCount}', #{item.originType}, " +
            "'${item.originTypeStr}',#{item.createStamp}, #{item.aliveSecond}, '${item.mediaServerId}' )" +
            "'${item.originTypeStr}',#{item.createTime}, #{item.aliveSecond}, '${item.mediaServerId}' )" +
            " </foreach>" +
            "</script>")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
@@ -106,4 +108,13 @@
    @Select("SELECT sp.* FROM stream_push sp left join gb_stream gs on gs.app = sp.app and gs.stream= sp.stream WHERE sp.mediaServerId=#{mediaServerId} and gs.gbId is null")
    List<StreamPushItem> selectAllByMediaServerIdWithOutGbID(String mediaServerId);
    @Update("UPDATE stream_push " +
            "SET status=${status} " +
            "WHERE app=#{app} AND stream=#{stream}")
    int updateStatus(String app, String stream, boolean status);
    @Update("UPDATE stream_push " +
            "SET status=#{status} " +
            "WHERE mediaServerId=#{mediaServerId}")
    void updateStatusByMediaServerId(String mediaServerId, boolean status);
}
src/main/java/com/genersoft/iot/vmp/storager/dao/UserMapper.java
@@ -49,4 +49,10 @@
    @Select("select u.*, r.id as roleID, r.name as roleName, r.authority as roleAuthority , r.createTime as roleCreateTime , r.updateTime as roleUpdateTime FROM user u, user_role r WHERE u.roleId=r.id")
    @ResultMap(value="roleMap")
    List<User> selectAll();
    @Select("select * from (select user.*, concat('${callId}_', pushKey) as str1 from user) as u where md5(u.str1) = '${sign}'")
    List<User> checkPushAuthorityByCallIdAndSign(String callId, String sign);
    @Select("select * from user where md5(pushKey) = '${sign}'")
    List<User> checkPushAuthorityByCallId(String sign);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -9,6 +9,8 @@
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -20,6 +22,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -599,6 +602,26 @@
    }
    @Override
    public void updateStreamAuthorityInfo(String app, String stream, StreamAuthorityInfo streamAuthorityInfo) {
        String key = VideoManagerConstants.MEDIA_STREAM_AUTHORITY + userSetting.getServerId() + "_" + app+ "_" + stream;
        redis.set(key, streamAuthorityInfo);
    }
    @Override
    public void removeStreamAuthorityInfo(String app, String stream) {
        String key = VideoManagerConstants.MEDIA_STREAM_AUTHORITY + userSetting.getServerId() + "_" + app+ "_" + stream ;
        redis.del(key);
    }
    @Override
    public StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream) {
        String key = VideoManagerConstants.MEDIA_STREAM_AUTHORITY + userSetting.getServerId() + "_" + app+ "_" + stream ;
        return (StreamAuthorityInfo) redis.get(key);
    }
    @Override
    public MediaItem getStreamInfo(String app, String streamId, String mediaServerId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
@@ -682,4 +705,6 @@
    public boolean deviceIsOnline(String deviceId) {
        return getDevice(deviceId).getOnline() == 1;
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -848,7 +848,7 @@
        streamPushMapper.addAll(streamPushItems);
        // TODO 待优化
        for (int i = 0; i < streamPushItems.size(); i++) {
            int onlineResult = gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true);
            int onlineResult = mediaOnline(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream());
            if (onlineResult > 0) {
                // 发送上线通知
                eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
@@ -856,11 +856,13 @@
        }
    }
    @Override
    public void updateMedia(StreamPushItem streamPushItem) {
        streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
        streamPushMapper.add(streamPushItem);
        gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true);
        mediaOffline(streamPushItem.getApp(), streamPushItem.getStream());
        if(!StringUtils.isEmpty(streamPushItem.getGbId() )){
            // 查找开启了全部直播流共享的上级平台
@@ -897,8 +899,26 @@
    }
    @Override
    public int mediaOutline(String app, String streamId) {
        return gbStreamMapper.setStatus(app, streamId, false);
    public int mediaOffline(String app, String stream) {
        GbStream gbStream = gbStreamMapper.selectOne(app, stream);
        int result;
        if ("proxy".equals(gbStream.getStreamType())) {
            result = streamProxyMapper.updateStatus(app, stream, false);
        }else {
            result = streamPushMapper.updateStatus(app, stream, false);
        }
        return result;
    }
    public int mediaOnline(String app, String stream) {
        GbStream gbStream = gbStreamMapper.selectOne(app, stream);
        int result;
        if ("proxy".equals(gbStream.getStreamType())) {
            result = streamProxyMapper.updateStatus(app, stream, true);
        }else {
            result = streamPushMapper.updateStatus(app, stream, true);
        }
        return result;
    }
    @Override
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
@@ -1,9 +1,14 @@
package com.genersoft.iot.vmp.vmanager.gb28181.media;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.annotations.Api;
@@ -16,6 +21,8 @@
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@Api(tags = "媒体流相关")
@Controller
@@ -26,7 +33,7 @@
    private final static Logger logger = LoggerFactory.getLogger(MediaController.class);
    @Autowired
    private IVideoManagerStorage storager;
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IStreamPushService streamPushService;
@@ -52,13 +59,47 @@
    })
    @GetMapping(value = "/stream_info_by_app_and_stream")
    @ResponseBody
    public WVPResult<StreamInfo> getStreamInfoByAppAndStream(@RequestParam String app, @RequestParam String stream, @RequestParam(required = false) String mediaServerId){
        StreamInfo streamInfoByAppAndStreamWithCheck = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId);
    public WVPResult<StreamInfo> getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app,
                                                             @RequestParam String stream,
                                                             @RequestParam(required = false) String mediaServerId,
                                                             @RequestParam(required = false) String callId,
                                                             @RequestParam(required = false) Boolean useSourceIpAsStreamIp){
        boolean authority = false;
        if (callId != null) {
            // 权限校验
            StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
            if (streamAuthorityInfo.getCallId().equals(callId)) {
                authority = true;
            }else {
                WVPResult<StreamInfo> result = new WVPResult<>();
                result.setCode(401);
                result.setMsg("fail");
                return result;
            }
        }else {
            // 是否登陆用户, 登陆用户返回完整信息
            LoginUser userInfo = SecurityUtils.getUserInfo();
            if (userInfo!= null) {
                authority = true;
            }
        }
        StreamInfo streamInfo;
        if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
            String host = request.getHeader("Host");
            String localAddr = host.split(":")[0];
            logger.info("使用{}作为返回流的ip", localAddr);
            streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
        }else {
            streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
        }
        WVPResult<StreamInfo> result = new WVPResult<>();
        if (streamInfoByAppAndStreamWithCheck != null){
        if (streamInfo != null){
            result.setCode(0);
            result.setMsg("scccess");
            result.setData(streamInfoByAppAndStreamWithCheck);
            result.setData(streamInfo);
        }else {
            result.setCode(-1);
            result.setMsg("fail");
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -193,7 +193,7 @@
                JSONObject data = jsonObject.getJSONObject("data");
                if (data != null) {
                       result.put("key", data.getString("key"));
                    StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStreamWithCheck("convert", streamId, mediaInfo.getId());
                    StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStreamWithCheck("convert", streamId, mediaInfo.getId(), false);
                    result.put("data", streamInfoResult);
                }
            }else {
web_src/src/layout/UiHeader.vue
@@ -23,12 +23,12 @@
      <!--            <el-menu-item style="float: right;" @click="loginout">退出</el-menu-item>-->
      <el-submenu index="" style="float: right;">
        <template slot="title">欢迎,{{ this.$cookies.get("session").username }}</template>
        <el-menu-item @click="changePassword">修改密码</el-menu-item>
        <el-menu-item @click="loginout">注销</el-menu-item>
        <el-menu-item @click="openDoc">在线文档</el-menu-item>
        <el-menu-item >
          <el-switch v-model="alarmNotify" inactive-text="报警信息推送" @change="alarmNotifyChannge"></el-switch>
        </el-menu-item>
        <el-menu-item @click="changePassword">修改密码</el-menu-item>
        <el-menu-item @click="loginout">注销</el-menu-item>
      </el-submenu>
    </el-menu>
    <changePasswordDialog ref="changePasswordDialog"></changePasswordDialog>