xiangpei
2024-10-10 5602c458099e38c6c4278334e5cb4f8029c63a50
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -7,16 +7,19 @@
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -79,6 +82,9 @@
    @Autowired
    private MediaConfig mediaConfig;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    /**
@@ -90,6 +96,8 @@
        if ("rtsp".equals(event.getSchema())) {
            logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream());
            addCount(event.getMediaServer().getId());
            String type = OriginType.values()[event.getMediaInfo().getOriginType()].getType();
            redisCatchStorage.addStream(event.getMediaServer(), type, event.getApp(), event.getStream(), event.getMediaInfo());
        }
    }
@@ -102,7 +110,15 @@
        if ("rtsp".equals(event.getSchema())) {
            logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream());
            removeCount(event.getMediaServer().getId());
            MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
                    event.getApp(), event.getStream(), event.getMediaServer().getId());
            if (mediaInfo == null) {
                return;
            }
            String type = OriginType.values()[mediaInfo.getOriginType()].getType();
            redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream());
        }
    }
@@ -121,7 +137,7 @@
                ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null);
            }
            // 查询redis是否存在此mediaServer
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServer.getId();
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId();
            Boolean hasKey = redisTemplate.hasKey(key);
            if (hasKey != null && ! hasKey) {
                redisTemplate.opsForValue().set(key, mediaServer);
@@ -132,7 +148,7 @@
    @Override
    public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck,
                                  boolean isPlayback, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
                                  boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
        if (mediaServer == null || mediaServer.getId() == null) {
            logger.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null");
            return null;
@@ -163,18 +179,12 @@
                logger.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
                return null;
            }
            rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode);
            rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode);
        } else {
            rtpServerPort = mediaServer.getRtpProxyPort();
        }
        return new SSRCInfo(rtpServerPort, ssrc, streamId);
    }
    @Override
    public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) {
        return openRTPServer(mediaServer, streamId, ssrc, ssrcCheck, isPlayback, port, onlyAuto, null, 0);
    }
    @Override
    public void closeRTPServer(MediaServer mediaServer, String streamId) {
@@ -263,7 +273,7 @@
        if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerInDataBase.getId())) {
            ssrcFactory.initMediaServerSSRC(mediaServerInDataBase.getId(),null);
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerInDataBase.getId();
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerInDataBase.getId();
        redisTemplate.opsForValue().set(key, mediaServerInDataBase);
        if (mediaServerInDataBase.isStatus()) {
            resetOnlineServerItem(mediaServerInDataBase);
@@ -279,8 +289,8 @@
    @Override
    public List<MediaServer> getAllOnlineList() {
        List<MediaServer> result = new ArrayList<>();
        List<Object> mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        List<Object> mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + ":" ));
        String onlineKey = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        for (Object mediaServerKey : mediaServerKeys) {
            String key = (String) mediaServerKey;
            MediaServer mediaServer = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
@@ -328,14 +338,14 @@
    @Override
    public List<MediaServer> getAllOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        Set<Object> mediaServerIdSet = redisTemplate.opsForZSet().reverseRange(key, 0, -1);
        List<MediaServer> result = new ArrayList<>();
        if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
            for (Object mediaServerId : mediaServerIdSet) {
                String mediaServerIdStr = (String) mediaServerId;
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerIdStr;
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerIdStr;
                result.add((MediaServer) redisTemplate.opsForValue().get(serverKey));
            }
        }
@@ -353,7 +363,7 @@
        if (mediaServerId == null) {
            return null;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerId;
        return JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
    }
@@ -365,7 +375,7 @@
    @Override
    public void clearMediaServerForOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        redisTemplate.delete(key);
    }
@@ -389,6 +399,7 @@
            logger.info("[添加媒体节点] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaServerMapper.add(mediaServer);
        if (mediaServer.isStatus()) {
            mediaNodeServerService.online(mediaServer);
@@ -403,7 +414,7 @@
    @Override
    public void resetOnlineServerItem(MediaServer serverItem) {
        // 更新缓存
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        // 使用zset的分数作为当前并发量, 默认值设置为0
        if (redisTemplate.opsForZSet().score(key, serverItem.getId()) == null) {  // 不存在则设置默认值 已存在则重置
            redisTemplate.opsForZSet().add(key, serverItem.getId(), 0L);
@@ -426,14 +437,14 @@
        if (mediaServerId == null) {
            return;
        }
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        redisTemplate.opsForZSet().incrementScore(key, mediaServerId, 1);
    }
    @Override
    public void removeCount(String mediaServerId) {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        redisTemplate.opsForZSet().incrementScore(key, mediaServerId, - 1);
    }
@@ -443,13 +454,13 @@
     */
    @Override
    public MediaServer getMediaServerForMinimumLoad(Boolean hasAssist) {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        String key = VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId();
        Long size = redisTemplate.opsForZSet().zCard(key);
        if (size  == null || size == 0) {
            logger.info("获取负载最低的节点时无在线节点");
            return null;
        }
        logger.error("ddddddd");
        // 获取分数最低的,及并发最低的
        Set<Object> objects = redisTemplate.opsForZSet().range(key, 0, -1);
        ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
@@ -457,7 +468,9 @@
        if (hasAssist == null) {
            String mediaServerId = (String)mediaServerObjectS.get(0);
            mediaServer = getOne(mediaServerId);
            logger.error("111");
        }else if (hasAssist) {
            logger.error("222");
            for (Object mediaServerObject : mediaServerObjectS) {
                String mediaServerId = (String)mediaServerObject;
                MediaServer serverItem = getOne(mediaServerId);
@@ -467,6 +480,7 @@
                }
            }
        }else if (!hasAssist) {
            logger.error("333");
            for (Object mediaServerObject : mediaServerObjectS) {
                String mediaServerId = (String)mediaServerObject;
                MediaServer serverItem = getOne(mediaServerId);
@@ -522,8 +536,8 @@
    @Override
    public void delete(String id) {
        mediaServerMapper.delOne(id);
        redisTemplate.opsForZSet().remove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id;
        redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), id);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + id;
        redisTemplate.delete(key);
        // 发送节点移除通知
        MediaServerDeleteEvent event = new MediaServerDeleteEvent(this);
@@ -579,6 +593,16 @@
            return false;
        }
        return mediaNodeServerService.stopSendRtp(mediaInfo, app, stream, ssrc);
    }
    @Override
    public boolean initStopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType());
        if (mediaNodeServerService == null) {
            logger.info("[stopSendRtp] 失败, mediaServer的类型: {},未找到对应的实现类", mediaInfo.getType());
            return false;
        }
        return mediaNodeServerService.initStopSendRtp(mediaInfo, app, stream, ssrc);
    }
    @Override
@@ -742,7 +766,7 @@
            calld = streamAuthorityInfo.getCallId();
        }
        List<StreamInfo> streamInfoList = getMediaList(mediaInfo, app, stream, calld);
        if (streamInfoList.isEmpty()) {
        if (streamInfoList == null || streamInfoList.isEmpty()) {
            return null;
        }else {
            return streamInfoList.get(0);
@@ -770,13 +794,132 @@
        String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId;
        streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app,  stream, callIdParam);
        streamInfoResult.setFlv(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        if ("abl".equals(mediaServer.getType())) {
            String flvFile = String.format("%s/%s.flv%s", app, stream, callIdParam);
            streamInfoResult.setFlv(addr, mediaServer.getFlvPort(),mediaServer.getFlvSSLPort(), flvFile);
            streamInfoResult.setWsFlv(addr, mediaServer.getWsFlvPort(),mediaServer.getWsFlvSSLPort(), flvFile);
        }else {
            String flvFile = String.format("%s/%s.live.flv%s", app, stream, callIdParam);
            streamInfoResult.setFlv(addr, mediaServer.getFlvPort(),mediaServer.getFlvSSLPort(), flvFile);
            streamInfoResult.setWsFlv(addr, mediaServer.getWsFlvPort(),mediaServer.getWsFlvSSLPort(), flvFile);
        }
        streamInfoResult.setFmp4(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setHls(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setTs(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam, isPlay);
        streamInfoResult.setMediaInfo(mediaInfo);
        if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
            String newStream = stream + "_" + mediaServer.getTranscodeSuffix();
            mediaServer.setTranscodeSuffix(null);
            StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay);
            streamInfoResult.setTranscodeStream(transcodeStreamInfo);
        }
        return streamInfoResult;
    }
    @Override
    public Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[isStreamReady] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, rtp, streamId);
        return mediaInfo != null;
    }
    @Override
    public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
        }
        mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
    }
    @Override
    public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
        }
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
    }
    @Override
    public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
        int localPort = sendRtpPortManager.getNextPort(mediaServer);
        if (localPort == 0) {
            return null;
        }
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setIp(ip);
        sendRtpItem.setPort(port);
        sendRtpItem.setSsrc(ssrc);
        sendRtpItem.setPlatformId(deviceId);
        sendRtpItem.setDeviceId(deviceId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(isTcp);
        sendRtpItem.setRtcp(rtcp);
        sendRtpItem.setApp("rtp");
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(mediaServer.getId());
        return sendRtpItem;
    }
    @Override
    public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
                                         String app, String stream, String channelId, boolean tcp, boolean rtcp){
        int localPort = sendRtpPortManager.getNextPort(serverItem);
        if (localPort == 0) {
            return null;
        }
        SendRtpItem sendRtpItem = new SendRtpItem();
        sendRtpItem.setIp(ip);
        sendRtpItem.setPort(port);
        sendRtpItem.setSsrc(ssrc);
        sendRtpItem.setApp(app);
        sendRtpItem.setStream(stream);
        sendRtpItem.setPlatformId(platformId);
        sendRtpItem.setChannelId(channelId);
        sendRtpItem.setTcp(tcp);
        sendRtpItem.setLocalPort(localPort);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setMediaServerId(serverItem.getId());
        sendRtpItem.setRtcp(rtcp);
        return sendRtpItem;
    }
    @Override
    public MediaServer getMediaServerByAppAndStream(String app, String stream) {
        List<MediaServer> mediaServerList = getAll();
        for (MediaServer mediaServer : mediaServerList) {
            MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream);
            if (mediaInfo != null) {
                return mediaServer;
            }
        }
        return null;
    }
    @Override
    public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[updateDownloadProcess] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
        }
        return mediaNodeServerService.updateDownloadProcess(mediaServer, app, stream);
    }
}