648540858
2024-04-11 651d4a1b0e340e03ad59d5f59f89f7e3bf4507ce
优化代码调用
7个文件已修改
109 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
@@ -45,6 +45,8 @@
    private Long aliveSecond;
    @Schema(description = "数据产生速度,单位byte/s")
    private Long bytesSpeed;
    @Schema(description = "鉴权参数")
    private String callId;
    public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) {
        MediaInfo mediaInfo = new MediaInfo();
@@ -303,4 +305,12 @@
    public void setSchema(String schema) {
        this.schema = schema;
    }
    public String getCallId() {
        return callId;
    }
    public void setCallId(String callId) {
        this.callId = callId;
    }
}
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -12,13 +12,13 @@
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IInviteStreamService;
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -18,7 +18,6 @@
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -505,18 +504,18 @@
        String type = "PULL";
        // 发送redis消息
        List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type);
        if (onStreamChangedHookParams.size() > 0) {
            for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
        List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (mediaInfoList.size() > 0) {
            for (MediaInfo mediaInfo : mediaInfoList) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetting.getServerId());
                jsonObject.put("app", onStreamChangedHookParam.getApp());
                jsonObject.put("stream", onStreamChangedHookParam.getStream());
                jsonObject.put("app", mediaInfo.getApp());
                jsonObject.put("stream", mediaInfo.getStream());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
                redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
            }
        }
    }
@@ -534,8 +533,8 @@
    private void syncPullStream(String mediaServerId){
        MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
        if (mediaServer != null) {
            List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
            if (!allPullStream.isEmpty()) {
            List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL");
            if (!mediaInfoList.isEmpty()) {
                List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
                Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
                if (mediaList != null && !mediaList.isEmpty()) {
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -156,10 +156,10 @@
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        // 兼容流注销时类型从redis记录获取
        OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
        MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
                event.getApp(), event.getStream(), event.getMediaServer().getId());
        if (onStreamChangedHookParam != null) {
            String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
        if (mediaInfo != null) {
            String type = OriginType.values()[mediaInfo.getOriginType()].getType();
            redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
            if ("PUSH".equalsIgnoreCase(type)) {
                // 冗余数据,自己系统中自用
@@ -302,8 +302,8 @@
        List<StreamPushItem> pushList = getPushList(mediaServerId);
        Map<String, StreamPushItem> pushItemMap = new HashMap<>();
        // redis记录
        List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
        Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
        List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
        Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>();
        if (pushList.size() > 0) {
            for (StreamPushItem streamPushItem : pushList) {
                if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
@@ -311,9 +311,9 @@
                }
            }
        }
        if (onStreamChangedHookParams.size() > 0) {
            for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
                streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
        if (mediaInfoList.size() > 0) {
            for (MediaInfo mediaInfo : mediaInfoList) {
                streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo);
            }
        }
        // 获取所有推流鉴权信息,清理过期的
@@ -352,21 +352,21 @@
            }
        }
        Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
        if (offlineOnStreamChangedHookParamList.size() > 0) {
        Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values();
        if (mediaInfos.size() > 0) {
            String type = "PUSH";
            for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
            for (MediaInfo mediaInfo : mediaInfos) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetting.getServerId());
                jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
                jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
                jsonObject.put("app", mediaInfo.getApp());
                jsonObject.put("stream", mediaInfo.getStream());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
                redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream());
                // 冗余数据,自己系统中自用
                redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
                redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId());
            }
        }
@@ -391,21 +391,21 @@
        // 发送流停止消息
        String type = "PUSH";
        // 发送redis消息
        List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (streamInfoList.size() > 0) {
            for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
        List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (mediaInfoList.size() > 0) {
            for (MediaInfo mediaInfo : mediaInfoList) {
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
                redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetting.getServerId());
                jsonObject.put("app", onStreamChangedHookParam.getApp());
                jsonObject.put("stream", onStreamChangedHookParam.getStream());
                jsonObject.put("app", mediaInfo.getApp());
                jsonObject.put("stream", mediaInfo.getStream());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 冗余数据,自己系统中自用
                redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
                redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId);
            }
        }
    }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
@@ -73,7 +73,7 @@
                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
                        onStreamChangedHookParam.setAliveSecond(0L);
                        onStreamChangedHookParam.setTotalReaderCount("0");
                        onStreamChangedHookParam.setTotalReaderCount(0);
                        onStreamChangedHookParam.setOriginType(0);
                        onStreamChangedHookParam.setOriginTypeStr("0");
                        onStreamChangedHookParam.setOriginTypeStr("unknown");
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -6,11 +6,11 @@
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
@@ -91,7 +91,7 @@
     * @param app
     * @param streamId
     */
    void addStream(MediaServer mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam item);
    void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo item);
    /**
     * 移除流信息从redis
@@ -108,7 +108,7 @@
     */
    void removeStream(String mediaServerId, String type);
    List<OnStreamChangedHookParam> getStreams(String mediaServerId, String pull);
    List<MediaInfo> getStreams(String mediaServerId, String pull);
    /**
     * 将device信息写入redis
@@ -134,7 +134,7 @@
    void resetAllSN();
    OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId);
    MediaInfo getStreamInfo(String app, String streamId, String mediaServerId);
    void addCpuInfo(double cpuInfo);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -9,11 +9,11 @@
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -315,14 +315,14 @@
    }
    @Override
    public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) {
    public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) {
        // 查找是否使用了callID
        StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId);
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
        if (streamAuthorityInfo != null) {
            onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId());
            mediaInfo.setCallId(streamAuthorityInfo.getCallId());
        }
        redisTemplate.opsForValue().set(key, onStreamChangedHookParam);
        redisTemplate.opsForValue().set(key, mediaInfo);
    }
    @Override
@@ -341,13 +341,13 @@
    }
    @Override
    public List<OnStreamChangedHookParam> getStreams(String mediaServerId, String type) {
        List<OnStreamChangedHookParam> result = new ArrayList<>();
    public List<MediaInfo> getStreams(String mediaServerId, String type) {
        List<MediaInfo> result = new ArrayList<>();
        String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId;
        List<Object> streams = RedisUtil.scan(redisTemplate, key);
        for (Object stream : streams) {
            OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream);
            result.add(onStreamChangedHookParam);
            MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream);
            result.add(mediaInfo);
        }
        return result;
    }
@@ -466,14 +466,14 @@
    @Override
    public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) {
    public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) {
        String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX  + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
        OnStreamChangedHookParam result = null;
        MediaInfo result = null;
        List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
        if (keys.size() > 0) {
            String key = (String) keys.get(0);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
            result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
        }
        return result;