648540858
2024-03-18 1768565ce045d4beddccc82d10b533ea3022cf8d
src/main/java/com/genersoft/iot/vmp/media/impl/MediaServerServiceImpl.java
File was renamed from src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -1,4 +1,4 @@
package com.genersoft.iot.vmp.service.impl;
package com.genersoft.iot.vmp.media.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
@@ -12,11 +12,12 @@
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.media.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -25,30 +26,23 @@
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import java.io.File;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * 媒体服务器节点管理
@@ -83,9 +77,6 @@
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private MediaServerMapper mediaServerMapper;
    @Autowired
@@ -113,20 +104,16 @@
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    private Map<String, IMediaNodeServerService> nodeServerServiceMap;
    /**
     * 初始化
     */
    @Override
    public void updateVmServer(List<MediaServerItem>  mediaServerItemList) {
        logger.info("[zlm] 缓存初始化 ");
    public void updateVmServer(List<MediaServerItem> mediaServerItemList) {
        logger.info("[媒体服务节点] 缓存初始化 ");
        for (MediaServerItem mediaServerItem : mediaServerItemList) {
            if (ObjectUtils.isEmpty(mediaServerItem.getId())) {
                continue;
@@ -173,7 +160,12 @@
        }
        int rtpServerPort;
        if (mediaServerItem.isRtpEnable()) {
            rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode);
            IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
            if (mediaNodeServerService == null) {
                logger.info("[openRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
                return null;
            }
            rtpServerPort = mediaNodeServerService.createRTPServer(mediaServerItem, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode);
        } else {
            rtpServerPort = mediaServerItem.getRtpProxyPort();
        }
@@ -191,7 +183,12 @@
        if (mediaServerItem == null) {
            return;
        }
        zlmServerFactory.closeRtpServer(mediaServerItem, streamId);
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return;
        }
        mediaNodeServerService.closeRtpServer(mediaServerItem, streamId);
    }
    @Override
@@ -200,21 +197,42 @@
            callback.run(false);
            return;
        }
        zlmServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return;
        }
        mediaNodeServerService.closeRtpServer(mediaServerItem, streamId, callback);
    }
    @Override
    public void closeRTPServer(String mediaServerId, String streamId) {
        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
        if (mediaServerItem != null && mediaServerItem.isRtpEnable()) {
        if (mediaServerItem == null) {
            return;
        }
        if (mediaServerItem.isRtpEnable()) {
            closeRTPServer(mediaServerItem, streamId);
        }
        zlmresTfulUtils.closeStreams(mediaServerItem, "rtp", streamId);
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return;
        }
        mediaNodeServerService.closeStreams(mediaServerItem, "rtp", streamId);
    }
    @Override
    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
        return zlmServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
        if (mediaServerItem == null) {
            return false;
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[updateRtpServerSSRC] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return false;
        }
        return mediaNodeServerService.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
    }
    @Override
@@ -227,12 +245,11 @@
    }
    /**
     * zlm 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
     * 媒体服务节点 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
     */
    @Override
    public void clearRTPServer(MediaServerItem mediaServerItem) {
        ssrcFactory.reset(mediaServerItem.getId());
    }
@@ -301,7 +318,7 @@
    }
    /**
     * 获取单个zlm服务器
     * 获取单个媒体服务节点服务器
     * @param mediaServerId 服务id
     * @return MediaServerItem
     */
@@ -331,24 +348,20 @@
        mediaServerItem.setCreateTime(DateUtil.getNow());
        mediaServerItem.setUpdateTime(DateUtil.getNow());
        mediaServerItem.setHookAliveInterval(30f);
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
        if (responseJSON != null) {
            JSONArray data = responseJSON.getJSONArray("data");
            if (data != null && data.size() > 0) {
                ZLMServerConfig zlmServerConfig= JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
                if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) {
                    throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
                }
                mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId());
                zlmServerConfig.setIp(mediaServerItem.getIp());
                mediaServerMapper.add(mediaServerItem);
                zlmServerOnline(zlmServerConfig);
            }else {
                throw new ControllerException(ErrorCode.ERROR100.getCode(),"连接失败");
            }
        if (mediaServerItem.getType() == null) {
            logger.info("[添加媒体节点] 失败, mediaServerItem的类型:为空");
            return;
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[添加媒体节点] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return;
        }
        if (mediaNodeServerService.checkNodeId(mediaServerItem)) {
            mediaServerMapper.add(mediaServerItem);
            mediaNodeServerService.online(mediaServerItem);
        }else {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"连接失败");
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置");
        }
    }
@@ -364,7 +377,7 @@
            TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
            int delResult = mediaServerMapper.delDefault();
            if (delResult == 0) {
                logger.error("移除数据库默认zlm节点失败");
                logger.error("移除数据库默认媒体服务节点节点失败");
                //事务回滚
                dataSourceTransactionManager.rollback(transactionStatus);
                return 0;
@@ -378,19 +391,19 @@
    }
    /**
     * 处理zlm上线
     * @param zlmServerConfig zlm上线携带的参数
     * 处理媒体服务节点上线
     * @param zlmServerConfig 媒体服务节点上线携带的参数
     */
    @Override
    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
        MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
        if (serverItem == null) {
            logger.warn("[未注册的zlm] 拒接接入:{}来自{}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
            logger.warn("请检查ZLM的<general.mediaServerId>配置是否与WVP的<media.id>一致");
            logger.warn("[未注册的媒体服务节点] 拒接接入:{}来自{}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
            logger.warn("请检查媒体服务节点的ID配置是否与WVP的<media.id>一致");
            return;
        }else {
            logger.info("[ZLM] 正在连接 : {} -> {}:{}",
            logger.info("[媒体服务节点] 正在连接 : {} -> {}:{}",
                    zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
        }
        serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
@@ -418,7 +431,7 @@
        serverItem.setStatus(true);
        if (ObjectUtils.isEmpty(serverItem.getId())) {
            logger.warn("[未注册的zlm] serverItem缺少ID, 无法接入:{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
            logger.warn("[未注册的媒体服务节点] serverItem缺少ID, 无法接入:{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
            return;
        }
        mediaServerMapper.update(serverItem);
@@ -436,9 +449,9 @@
        final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
        dynamicTask.stop(zlmKeepaliveKey);
        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000);
        publisher.zlmOnlineEventPublish(serverItem.getId());
        publisher.mediaServerOnlineEventPublish(serverItem.getId());
        logger.info("[ZLM] 连接成功 {} - {}:{} ",
        logger.info("[媒体服务节点] 连接成功 {} - {}:{} ",
                zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
    }
@@ -452,12 +465,12 @@
        @Override
        public void run() {
            logger.info("[zlm心跳到期]:" + serverItem.getId());
            logger.info("[媒体服务节点心跳到期]:" + serverItem.getId());
            // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
            JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem);
            if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
                logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息,请检查zlm是否可以正常向wvp发送心跳", serverItem.getId());
                // 添加zlm信息
                logger.info("[媒体服务节点心跳到期]:{}验证后媒体服务节点仍在线,恢复心跳信息,请检查媒体服务节点是否可以正常向wvp发送心跳", serverItem.getId());
                // 添加媒体服务节点信息
                updateMediaServerKeepalive(serverItem.getId(), null);
            }else {
                publisher.zlmOfflineEventPublish(serverItem.getId());
@@ -556,13 +569,13 @@
    }
    /**
     * 对zlm服务器进行基础配置
     * 对媒体服务节点服务器进行基础配置
     * @param mediaServerItem 服务ID
     * @param restart 是否重启zlm
     * @param restart 是否重启媒体服务节点
     */
    @Override
    public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) {
        logger.info("[ZLM] 正在设置 :{} -> {}:{}",
        logger.info("[媒体服务节点] 正在设置 :{} -> {}:{}",
                mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
        String protocol = sslEnabled ? "https" : "http";
        String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
@@ -612,17 +625,17 @@
        if (responseJSON != null && responseJSON.getInteger("code") == 0) {
            if (restart) {
                logger.info("[ZLM] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
                logger.info("[媒体服务节点] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                zlmresTfulUtils.restartServer(mediaServerItem);
            }else {
                logger.info("[ZLM] 设置成功 {} -> {}:{}",
                logger.info("[媒体服务节点] 设置成功 {} -> {}:{}",
                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
            }
        }else {
            logger.info("[ZLM] 设置zlm失败 {} -> {}:{}",
            logger.info("[媒体服务节点] 设置媒体服务节点失败 {} -> {}:{}",
                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
        }
@@ -701,11 +714,11 @@
            // 缓存不存在,从数据库查询,如果数据库不存在则是错误的
            mediaServerItem = getOneFromDatabase(mediaServerId);
            if (mediaServerItem == null) {
                logger.warn("[更新ZLM 保活信息] 流媒体{}尚未加入使用,请检查节点中是否含有此流媒体 ", mediaServerId);
                logger.warn("[更新媒体服务节点 保活信息] 流媒体{}尚未加入使用,请检查节点中是否含有此流媒体 ", mediaServerId);
                return;
            }
            // zlm连接重试
            logger.warn("[更新ZLM 保活信息]尝试链接zml id {}", mediaServerId);
            // 媒体服务节点连接重试
            logger.warn("[更新媒体服务节点 保活信息]尝试链接zml id {}", mediaServerId);
            ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
            redisTemplate.opsForValue().set(key, mediaServerItem);