648540858
2022-07-19 28b5cc39d0a2d9939f70b4c980a31d9b27fc1e4c
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -4,10 +4,9 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -15,12 +14,12 @@
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -29,10 +28,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
@@ -40,7 +36,9 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
 * 媒体服务器节点管理
@@ -60,7 +58,7 @@
    private Integer serverPort;
    @Autowired
    private UserSetup userSetup;
    private UserSetting userSetting;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
@@ -84,7 +82,7 @@
    private RedisUtil redisUtil;
    @Autowired
    private IVideoManagerStorager storager;
    private IVideoManagerStorage storager;
    @Autowired
    private IStreamProxyService streamProxyService;
@@ -95,14 +93,12 @@
    @Autowired
    JedisUtil jedisUtil;
    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 初始化
     */
    @Override
    public void updateVmServer(List<MediaServerItem>  mediaServerItemList) {
        logger.info("[缓存初始化] Media Server ");
        logger.info("[zlm] 缓存初始化 ");
        for (MediaServerItem mediaServerItem : mediaServerItemList) {
            if (StringUtils.isEmpty(mediaServerItem.getId())) {
                continue;
@@ -111,10 +107,10 @@
            if (mediaServerItem.getSsrcConfig() == null) {
                SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain());
                mediaServerItem.setSsrcConfig(ssrcConfig);
                redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
                redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
            }
            // 查询redis是否存在此mediaServer
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItem.getId();
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
            if (!redisUtil.hasKey(key)) {
                redisUtil.set(key, mediaServerItem);
            }
@@ -123,17 +119,17 @@
    }
    @Override
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId) {
        return openRTPServer(mediaServerItem, streamId, false);
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback) {
        return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,isPlayback);
    }
    @Override
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback) {
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback) {
        if (mediaServerItem == null || mediaServerItem.getId() == null) {
            return null;
        }
        // 获取mediaServer可用的ssrc
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItem.getId();
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
        SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
        if (ssrcConfig == null) {
@@ -141,10 +137,14 @@
            return null;
        }else {
            String ssrc = null;
            if (isPlayback) {
                ssrc = ssrcConfig.getPlayBackSsrc();
            if (presetSsrc != null) {
                ssrc = presetSsrc;
            }else {
                ssrc = ssrcConfig.getPlaySsrc();
                if (isPlayback) {
                    ssrc = ssrcConfig.getPlayBackSsrc();
                }else {
                    ssrc = ssrcConfig.getPlaySsrc();
                }
            }
            if (streamId == null) {
@@ -152,7 +152,7 @@
            }
            int rtpServerPort = mediaServerItem.getRtpProxyPort();
            if (mediaServerItem.isRtpEnable()) {
                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId);
                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0);
            }
            redisUtil.set(key, mediaServerItem);
            return new SSRCInfo(rtpServerPort, ssrc, streamId);
@@ -181,7 +181,7 @@
        SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
        ssrcConfig.releaseSsrc(ssrc);
        mediaServerItem.setSsrcConfig(ssrcConfig);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItem.getId();
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
        redisUtil.set(key, mediaServerItem);
    }
@@ -191,7 +191,8 @@
    @Override
    public void clearRTPServer(MediaServerItem mediaServerItem) {
        mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
        redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), mediaServerItem.getId(), 0);
        redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0);
    }
@@ -211,15 +212,15 @@
                    )
            );
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItemInDataBase.getId();
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
        redisUtil.set(key, mediaServerItemInDataBase);
    }
    @Override
    public List<MediaServerItem> getAll() {
        List<MediaServerItem> result = new ArrayList<>();
        List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetup.getServerId() + "_" ));
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        for (Object mediaServerKey : mediaServerKeys) {
            String key = (String) mediaServerKey;
            MediaServerItem mediaServerItem = (MediaServerItem) redisUtil.get(key);
@@ -232,11 +233,10 @@
        }
        result.sort((serverItem1, serverItem2)->{
            int sortResult = 0;
            try {
                sortResult = format.parse(serverItem1.getCreateTime()).compareTo(format.parse(serverItem2.getCreateTime()));
            } catch (ParseException e) {
                e.printStackTrace();
            }
            LocalDateTime localDateTime1 = LocalDateTime.parse(serverItem1.getCreateTime(), DateUtil.formatter);
            LocalDateTime localDateTime2 = LocalDateTime.parse(serverItem2.getCreateTime(), DateUtil.formatter);
            sortResult = localDateTime1.compareTo(localDateTime2);
            return  sortResult;
        });
        return result;
@@ -250,13 +250,13 @@
    @Override
    public List<MediaServerItem> getAllOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
        List<MediaServerItem> result = new ArrayList<>();
        if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
            for (String mediaServerId : mediaServerIdSet) {
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
                result.add((MediaServerItem) redisUtil.get(serverKey));
            }
        }
@@ -274,7 +274,7 @@
        if (mediaServerId == null) {
            return null;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        return (MediaServerItem)redisUtil.get(key);
    }
@@ -286,15 +286,15 @@
    @Override
    public void clearMediaServerForOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.del(key);
    }
    @Override
    public WVPResult<String> add(MediaServerItem mediaServerItem) {
        WVPResult<String> result = new WVPResult<>();
        mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis()));
        mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis()));
        mediaServerItem.setCreateTime(DateUtil.getNow());
        mediaServerItem.setUpdateTime(DateUtil.getNow());
        mediaServerItem.setHookAliveInterval(120);
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
        if (responseJSON != null) {
@@ -355,7 +355,7 @@
     */
    @Override
    public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
        logger.info("[ ZLM:{} ]-[ {}:{} ]正在连接",
        logger.info("[ZLM] 正在连接 : {} -> {}:{}",
                zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
        MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
@@ -393,7 +393,7 @@
            return;
        }
        mediaServerMapper.update(serverItem);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
        if (redisUtil.get(key) == null) {
            SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
            serverItem.setSsrcConfig(ssrcConfig);
@@ -404,10 +404,11 @@
        redisUtil.set(key, serverItem);
        resetOnlineServerItem(serverItem);
        updateMediaServerKeepalive(serverItem.getId(), null);
        setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
        if (serverItem.isAutoConfig()) {
            setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
        }
        publisher.zlmOnlineEventPublish(serverItem.getId());
        logger.info("[ ZLM:{} ]-[ {}:{} ]连接成功",
        logger.info("[ZLM] 连接成功 {} - {}:{} ",
                zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
    }
@@ -420,7 +421,7 @@
    @Override
    public void resetOnlineServerItem(MediaServerItem serverItem) {
        // 更新缓存
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        // 使用zset的分数作为当前并发量, 默认值设置为0
        if (redisUtil.zScore(key, serverItem.getId()) == null) {  // 不存在则设置默认值 已存在则重置
            redisUtil.zAdd(key, serverItem.getId(), 0L);
@@ -446,14 +447,14 @@
        if (mediaServerId == null) {
            return;
        }
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.zIncrScore(key, mediaServerId, 1);
    }
    @Override
    public void removeCount(String mediaServerId) {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        redisUtil.zIncrScore(key, mediaServerId, - 1);
    }
@@ -463,7 +464,7 @@
     */
    @Override
    public MediaServerItem getMediaServerForMinimumLoad() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        if (redisUtil.zSize(key)  == null || redisUtil.zSize(key) == 0) {
            logger.info("获取负载最低的节点时无在线节点");
@@ -485,7 +486,7 @@
     */
    @Override
    public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) {
        logger.info("[ ZLM:{} ]-[ {}:{} ]正在设置zlm",
        logger.info("[ZLM] 正在设置 :{} -> {}:{}",
                mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
        String protocol = sslEnabled ? "https" : "http";
        String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
@@ -497,14 +498,14 @@
        param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
        param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264  -f flv %s");
        param.put("hook.enable","1");
        param.put("hook.on_flow_report","");
        param.put("hook.on_flow_report",String.format("%s/on_flow_report", hookPrex));
        param.put("hook.on_play",String.format("%s/on_play", hookPrex));
        param.put("hook.on_http_access","");
        param.put("hook.on_http_access",String.format("%s/on_http_access", hookPrex));
        param.put("hook.on_publish", String.format("%s/on_publish", hookPrex));
        param.put("hook.on_record_mp4",recordHookPrex != null? String.format("%s/on_record_mp4", recordHookPrex): "");
        param.put("hook.on_record_ts","");
        param.put("hook.on_rtsp_auth","");
        param.put("hook.on_rtsp_realm","");
        param.put("hook.on_record_ts",String.format("%s/on_record_ts", hookPrex));
        param.put("hook.on_rtsp_auth",String.format("%s/on_rtsp_auth", hookPrex));
        param.put("hook.on_rtsp_realm",String.format("%s/on_rtsp_realm", hookPrex));
        param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrex));
        param.put("hook.on_shell_login",String.format("%s/on_shell_login", hookPrex));
        param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex));
@@ -512,7 +513,7 @@
        param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
        param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
        param.put("hook.timeoutSec","20");
        param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
        param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
        // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
        // 置0关闭此特性(推流断开会导致立即断开播放器)
        // 此参数不应大于播放器超时时间
@@ -521,22 +522,25 @@
        // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
        // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
        param.put("general.wait_track_ready_ms", "3000" );
        if (mediaServerItem.isRtpEnable() && !StringUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
            param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
        }
        JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
        if (responseJSON != null && responseJSON.getInteger("code") == 0) {
            if (restart) {
                logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm成功, 开始重启以保证配置生效",
                logger.info("[ZLM] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
                zlmresTfulUtils.restartServer(mediaServerItem);
            }else {
                logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm成功",
                logger.info("[ZLM] 设置成功 {} -> {}:{}",
                        mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
            }
        }else {
            logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm失败",
            logger.info("[ZLM] 设置zlm失败 {} -> {}:{}",
                    mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
        }
@@ -614,8 +618,8 @@
    @Override
    public void delete(String id) {
        redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
        redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id;
        redisUtil.del(key);
    }
    @Override
@@ -633,7 +637,7 @@
            logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
            return;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
        String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
        redisUtil.set(key, data, hookAliveInterval);
    }