648540858
2021-12-10 faac93613a997e226998018e1165412b33989f32
优化启动后清理过期信息的逻辑
5个文件已修改
103 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
@@ -30,7 +30,7 @@
    public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
        // 配置springboot默认Config为空,即不让应用去修改redis的默认配置,因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
        setKeyspaceNotificationsConfigParameter("");
//        setKeyspaceNotificationsConfigParameter("");
    }
    /**
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOfflineEventListener.java
@@ -31,10 +31,8 @@
    @Override
    public void onApplicationEvent(ZLMOfflineEvent event) {
        if (logger.isDebugEnabled()) {
            logger.debug("ZLM离线事件触发,ID:" + event.getMediaServerId());
        }
        logger.info("ZLM离线事件触发,ID:" + event.getMediaServerId());
        // 处理ZLM离线
        mediaServerService.zlmServerOffline(event.getMediaServerId());
        streamProxyService.zlmServerOffline(event.getMediaServerId());
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMOnlineEventListener.java
@@ -37,10 +37,8 @@
    @Override
    public void onApplicationEvent(ZLMOnlineEvent event) {
        if (logger.isDebugEnabled()) {
            logger.debug("ZLM上线事件触发,ID:" + event.getMediaServerId());
        }
        logger.info("ZLM上线事件触发,ID:" + event.getMediaServerId());
        streamPushService.zlmServerOnline(event.getMediaServerId());
        streamProxyService.zlmServerOnline(event.getMediaServerId());
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -565,7 +565,6 @@
        redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
        redisUtil.del(key);
        mediaServerMapper.delOne(id);
    }
    @Override
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -154,47 +154,61 @@
        if (mediaServerItem == null) {
            return;
        }
        // 数据库记录
        List<StreamPushItem> pushList = getPushList(mediaServerId);
        Map<String, StreamPushItem> pushItemMap = new HashMap<>();
        // redis记录
        List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
        Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>();
        if (pushList.size() > 0) {
            Map<String, StreamPushItem> pushItemMap = new HashMap<>();
            for (StreamPushItem streamPushItem : pushList) {
                pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
            }
            zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
                if (mediaList == null) return;
                String dataStr = mediaList.getString("data");
                Integer code = mediaList.getInteger("code");
                List<StreamPushItem> streamPushItems = null;
                if (code == 0 ) {
                    if (dataStr != null) {
                        streamPushItems = handleJSON(dataStr, mediaServerItem);
                    }
                }
                if (streamPushItems != null) {
                    for (StreamPushItem streamPushItem : streamPushItems) {
                        pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    }
                }
                Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
                if (offlinePushItems.size() > 0) {
                    String type = "PUSH";
                    streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
                    for (StreamPushItem offlinePushItem : offlinePushItems) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("serverId", userSetup.getServerId());
                        jsonObject.put("app", offlinePushItem.getApp());
                        jsonObject.put("stream", offlinePushItem.getStream());
                        jsonObject.put("register", false);
                        jsonObject.put("mediaServerId", mediaServerId);
                        redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                        // 移除redis内流的信息
                        redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
                    }
                }
            }));
        }
        if (streamInfoPushList.size() > 0) {
            for (StreamInfo streamInfo : streamInfoPushList) {
                streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo);
            }
        }
        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
            if (mediaList == null) return;
            String dataStr = mediaList.getString("data");
            Integer code = mediaList.getInteger("code");
            List<StreamPushItem> streamPushItems = null;
            if (code == 0 ) {
                if (dataStr != null) {
                    streamPushItems = handleJSON(dataStr, mediaServerItem);
                }
            }
            if (streamPushItems != null) {
                for (StreamPushItem streamPushItem : streamPushItems) {
                    pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                }
            }
            Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
            if (offlinePushItems.size() > 0) {
                String type = "PUSH";
                streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
            }
            Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values();
            if (offlineStreamInfoItems.size() > 0) {
                String type = "PUSH";
                for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) {
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("serverId", userSetup.getServerId());
                    jsonObject.put("app", offlineStreamInfoItem.getApp());
                    jsonObject.put("stream", offlineStreamInfoItem.getStreamId());
                    jsonObject.put("register", false);
                    jsonObject.put("mediaServerId", mediaServerId);
                    redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                    // 移除redis内流的信息
                    redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId());
                }
            }
        }));
    }
    @Override
@@ -211,6 +225,8 @@
        List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
        if (streamInfoList.size() > 0) {
            for (StreamInfo streamInfo : streamInfoList) {
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetup.getServerId());
                jsonObject.put("app", streamInfo.getApp());
@@ -218,8 +234,6 @@
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
            }
        }
    }