648540858
2022-07-27 f84eebdb75a6af1894b057b0255b01992bdd4f03
优化部分hook订阅
4个文件已修改
30 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -108,6 +108,7 @@
                subscribe.response(null, json);
            }
        }
        mediaServerService.updateMediaServerKeepalive(mediaServerId, json.getJSONObject("data"));
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
@@ -619,10 +620,15 @@
                subscribe.response(null, jsonObject);
            }
        }
        ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(jsonObject, ZLMServerConfig.class);
        if (zlmServerConfig !=null ) {
            mediaServerService.zlmServerOnline(zlmServerConfig);
        }
        JSONObject ret = new JSONObject();
        ret.put("code", 0);
        ret.put("msg", "success");
        return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
        return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
    }
    private Map<String, String> urlParamToMap(String params) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -75,18 +75,8 @@
                if (startGetMedia != null) {
                    startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
                }
                mediaServerService.zlmServerOnline(zlmServerConfig);
            }
        });
        // 订阅 zlm保活事件, 当zlm离线时做业务的处理
        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,new JSONObject(),
                (MediaServerItem mediaServerItem, JSONObject response)->{
                    String mediaServerId = response.getString("mediaServerId");
                    if (mediaServerId !=null ) {
                        mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
                    }
                });
        // 获取zlm信息
        logger.info("[zlm] 等待默认zlm中...");
@@ -113,6 +103,7 @@
                }
                startGetMedia = null;
            }
            hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
        //  TODO 清理数据库中与redis不匹配的zlm
        }, 60 * 1000 );
    }
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -355,14 +355,15 @@
     */
    @Override
    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
        logger.info("[ZLM] 正在连接 : {} -> {}:{}",
                zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
        MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
        if (serverItem == null) {
            logger.warn("[未注册的zlm] 拒接接入:{}来自{}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
            logger.warn("请检查ZLM的<general.mediaServerId>配置是否与WVP的<media.id>一致");
            return;
        }else {
            logger.info("[ZLM] 正在连接 : {} -> {}:{}",
                    zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
        }
        serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
        if (serverItem.getHttpPort() == 0) {
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
@@ -66,7 +66,7 @@
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // TODO 增加队列
        logger.warn("[REDIS 消息-推流设备状态变化]: {}", new String(message.getBody()));
        logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
        taskQueue.offer(message);
        if (!taskQueueHandlerRun) {
@@ -76,7 +76,7 @@
                    Message msg = taskQueue.poll();
                    PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
                    if (statusChangeFromPushStream == null) {
                        logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
                        logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
                        return;
                    }
                    // 取消定时任务
@@ -106,7 +106,7 @@
        //  启动时设置所有推流通道离线,发起查询请求
        redisCatchStorage.sendStreamPushRequestedMsgForStatus();
        dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
            logger.info("[REDIS 消息]未收到redis回复推流设备状态,执行推流设备离线");
            logger.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
            // 五秒收不到请求就设置通道离线,然后通知上级离线
            streamPushService.allStreamOffline();
        }, 5000);