leesam
2024-04-10 16b7e4a7ef473a6af29ec78aeb2f471fa398efdd
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -6,15 +6,13 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
@@ -54,7 +52,7 @@
    private final static Logger logger = LoggerFactory.getLogger(RtpController.class);
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    private HookSubscribe hookSubscribe;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -83,7 +81,7 @@
        logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
                isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        if (mediaServerItem == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
        }
@@ -109,12 +107,11 @@
        }
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (mediaServerItemInUse, hookParam)->{
                        OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
                        if (stream.equals(serverTimeoutHookParam.getStream_id())) {
            hookSubscribe.addSubscribe(hook,
                    (hookData)->{
                        if (stream.equals(hookData.getStream())) {
                            logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
                            OkHttpClient client = httpClientBuilder.build();
@@ -125,7 +122,7 @@
                            } catch (IOException e) {
                                logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                            }
                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                            hookSubscribe.removeSubscribe(hook);
                        }
                    });
        }
@@ -162,7 +159,7 @@
    @Parameter(name = "stream", description = "流的ID", required = true)
    public void closeRtpServer(String stream) {
        logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        zlmServerFactory.closeRtpServer(mediaServerItem,stream);
        zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a");
        String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_"  + stream;
@@ -225,7 +222,7 @@
        if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) {
            throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数");
        }
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
        String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_"  + callId;
        OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
        if (sendInfo == null) {
@@ -278,10 +275,10 @@
            paramForVideo = null;
        }
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
        Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
        if (streamReady) {
            if (paramForVideo != null) {
                JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo);
                JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo);
                if (jsonObject.getInteger("code") == 0) {
                    logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo);
                    redisTemplate.opsForValue().set(key, sendInfo);
@@ -292,7 +289,7 @@
                }
            }
            if(paramForAudio != null) {
                JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio);
                JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio);
                if (jsonObject.getInteger("code") == 0) {
                    logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio);
                    redisTemplate.opsForValue().set(key, sendInfo);
@@ -305,18 +302,18 @@
        }else {
            logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
            String uuid = UUID.randomUUID().toString();
            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
            dynamicTask.startDelay(uuid, ()->{
                logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId);
                redisTemplate.delete(key);
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                hookSubscribe.removeSubscribe(hook);
            }, 10000);
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            OtherRtpSendInfo finalSendInfo = sendInfo;
            hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
                    (mediaServerItemInUse, response)->{
            hookSubscribe.removeSubscribe(hook);
            hookSubscribe.addSubscribe(hook,
                    (hookData)->{
                        dynamicTask.stop(uuid);
                        logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId);
                        try {
@@ -325,7 +322,7 @@
                            throw new RuntimeException(e);
                        }
                        if (paramForVideo != null) {
                            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo);
                            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo);
                            if (jsonObject.getInteger("code") == 0) {
                                logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo);
                                redisTemplate.opsForValue().set(key, finalSendInfo);
@@ -336,7 +333,7 @@
                            }
                        }
                        if(paramForAudio != null) {
                            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio);
                            JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio);
                            if (jsonObject.getInteger("code") == 0) {
                                logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio);
                                redisTemplate.opsForValue().set(key, finalSendInfo);
@@ -346,7 +343,7 @@
                                throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg"));
                            }
                        }
                        hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                        hookSubscribe.removeSubscribe(hook);
                    });
        }
    }
@@ -367,7 +364,7 @@
        param.put("app",sendInfo.getPushApp());
        param.put("stream",sendInfo.getPushStream());
        param.put("ssrc",sendInfo.getPushSSRC());
        MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
        MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
        Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
        if (!result) {
            logger.info("[第三方服务对接->关闭发送流] 失败 callId->{}", callId);