648540858
2022-08-19 1a9e49d9ff210e39f6297150db758906a4f02e6f
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -8,7 +8,6 @@
import java.util.Map;
import java.util.Set;
import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,7 +35,6 @@
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -54,9 +52,6 @@
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private ZLMRunner zlmRunner;
    @Value("${server.ssl.enabled:false}")
    private boolean sslEnabled;
@@ -85,14 +80,9 @@
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private EventPublisher publisher;
    @Autowired
    JedisUtil jedisUtil;
    /**
     * 初始化
@@ -108,12 +98,12 @@
            if (mediaServerItem.getSsrcConfig() == null) {
                SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain());
                mediaServerItem.setSsrcConfig(ssrcConfig);
                redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
                RedisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
            }
            // 查询redis是否存在此mediaServer
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
            if (!redisUtil.hasKey(key)) {
                redisUtil.set(key, mediaServerItem);
            if (!RedisUtil.hasKey(key)) {
                RedisUtil.set(key, mediaServerItem);
            }
        }
@@ -155,7 +145,7 @@
            if (mediaServerItem.isRtpEnable()) {
                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port);
            }
            redisUtil.set(key, mediaServerItem);
            RedisUtil.set(key, mediaServerItem);
            return new SSRCInfo(rtpServerPort, ssrc, streamId);
        }
    }
@@ -188,7 +178,7 @@
        ssrcConfig.releaseSsrc(ssrc);
        mediaServerItem.setSsrcConfig(ssrcConfig);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
        redisUtil.set(key, mediaServerItem);
        RedisUtil.set(key, mediaServerItem);
    }
    /**
@@ -197,7 +187,7 @@
    @Override
    public void clearRTPServer(MediaServerItem mediaServerItem) {
        mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
        redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0);
        RedisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0);
    }
@@ -219,19 +209,19 @@
            );
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
        redisUtil.set(key, mediaServerItemInDataBase);
        RedisUtil.set(key, mediaServerItemInDataBase);
    }
    @Override
    public List<MediaServerItem> getAll() {
        List<MediaServerItem> result = new ArrayList<>();
        List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
        List<Object> mediaServerKeys = RedisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        for (Object mediaServerKey : mediaServerKeys) {
            String key = (String) mediaServerKey;
            MediaServerItem mediaServerItem = (MediaServerItem) redisUtil.get(key);
            MediaServerItem mediaServerItem = (MediaServerItem) RedisUtil.get(key);
            // 检查状态
            Double aDouble = redisUtil.zScore(onlineKey, mediaServerItem.getId());
            Double aDouble = RedisUtil.zScore(onlineKey, mediaServerItem.getId());
            if (aDouble != null) {
                mediaServerItem.setStatus(true);
            }
@@ -257,13 +247,13 @@
    @Override
    public List<MediaServerItem> getAllOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
        Set<String> mediaServerIdSet = RedisUtil.zRevRange(key, 0, -1);
        List<MediaServerItem> result = new ArrayList<>();
        if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
            for (String mediaServerId : mediaServerIdSet) {
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
                result.add((MediaServerItem) redisUtil.get(serverKey));
                result.add((MediaServerItem) RedisUtil.get(serverKey));
            }
        }
        Collections.reverse(result);
@@ -281,13 +271,7 @@
            return null;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
        if(null==serverItem){
            //zlm服务不在线,启动重连
            reloadZlm();
            serverItem=(MediaServerItem)redisUtil.get(key);
        }
        return serverItem;
        return (MediaServerItem)RedisUtil.get(key);
    }
    @Override
@@ -299,7 +283,7 @@
    @Override
    public void clearMediaServerForOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.del(key);
        RedisUtil.del(key);
    }
    @Override
@@ -407,16 +391,15 @@
        }
        mediaServerMapper.update(serverItem);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
        if (redisUtil.get(key) == null) {
        if (RedisUtil.get(key) == null) {
            SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
            serverItem.setSsrcConfig(ssrcConfig);
        }else {
            MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
            MediaServerItem mediaServerItemInRedis = (MediaServerItem)RedisUtil.get(key);
            serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
        }
        redisUtil.set(key, serverItem);
        RedisUtil.set(key, serverItem);
        resetOnlineServerItem(serverItem);
        updateMediaServerKeepalive(serverItem.getId(), null);
        if (serverItem.isAutoConfig()) {
            setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
        }
@@ -436,15 +419,15 @@
        // 更新缓存
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        // 使用zset的分数作为当前并发量, 默认值设置为0
        if (redisUtil.zScore(key, serverItem.getId()) == null) {  // 不存在则设置默认值 已存在则重置
            redisUtil.zAdd(key, serverItem.getId(), 0L);
        if (RedisUtil.zScore(key, serverItem.getId()) == null) {  // 不存在则设置默认值 已存在则重置
            RedisUtil.zAdd(key, serverItem.getId(), 0L);
            // 查询服务流数量
            zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
                Integer code = mediaList.getInteger("code");
                if (code == 0) {
                    JSONArray data = mediaList.getJSONArray("data");
                    if (data != null) {
                        redisUtil.zAdd(key, serverItem.getId(), data.size());
                        RedisUtil.zAdd(key, serverItem.getId(), data.size());
                    }
                }
            }));
@@ -461,14 +444,14 @@
            return;
        }
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.zIncrScore(key, mediaServerId, 1);
        RedisUtil.zIncrScore(key, mediaServerId, 1);
    }
    @Override
    public void removeCount(String mediaServerId) {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.zIncrScore(key, mediaServerId, - 1);
        RedisUtil.zIncrScore(key, mediaServerId, - 1);
    }
    /**
@@ -479,18 +462,15 @@
    public MediaServerItem getMediaServerForMinimumLoad() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
            logger.info("获取负载最低的节点时无在线节点,启动重连机制");
            //启动重连
            reloadZlm();
            if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
        if (RedisUtil.zSize(key)  == null || RedisUtil.zSize(key) == 0) {
            if (RedisUtil.zSize(key)  == null || RedisUtil.zSize(key) == 0) {
                logger.info("获取负载最低的节点时无在线节点");
                return null;
            }
        }
        // 获取分数最低的,及并发最低的
        Set<Object> objects = redisUtil.ZRange(key, 0, -1);
        Set<Object> objects = RedisUtil.ZRange(key, 0, -1);
        ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
        String mediaServerId = (String)mediaServerObjectS.get(0);
@@ -633,9 +613,9 @@
    @Override
    public void delete(String id) {
        redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
        RedisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id;
        redisUtil.del(key);
        RedisUtil.del(key);
    }
    @Override
    public void deleteDb(String id){
@@ -647,9 +627,14 @@
    public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
        MediaServerItem mediaServerItem = getOne(mediaServerId);
        if (mediaServerItem == null) {
            // 缓存不存在,从数据库查询,如果数据库不存在则是错误的
            MediaServerItem mediaServerItemFromDatabase = getOneFromDatabase(mediaServerId);
            if (mediaServerItemFromDatabase == null) {
                return;
            }
            // zlm连接重试
            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
            reloadZlm();
//            reloadZlm();
            mediaServerItem = getOne(mediaServerId);
            if (mediaServerItem == null) {
                // zlm连接重试
@@ -659,7 +644,11 @@
        }
        String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
        redisUtil.set(key, data, hookAliveInterval);
        RedisUtil.set(key, data, hookAliveInterval);
    }
    private MediaServerItem getOneFromDatabase(String mediaServerId) {
        return mediaServerMapper.queryOne(mediaServerId);
    }
    @Override
@@ -675,15 +664,6 @@
            if (!mediaServerItemMap.containsKey(mediaServerItem.getId())) {
                delete(mediaServerItem.getId());
            }
        }
    }
    public void reloadZlm(){
        try {
            zlmRunner.run();
            Thread.sleep(500);//延迟0.5秒缓冲时间
        } catch (Exception e) {
            logger.warn("尝试重连zlm失败!",e);
        }
    }
}