648540858
2024-04-02 8b90fade9eb3a62b428f23f2306cb1911c98d355
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
@@ -6,14 +6,12 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -108,12 +106,11 @@
        }
        // 注册回调如果rtp收流超时则通过回调发送通知
        if (callBack != null) {
            HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
                    (mediaServerItemInUse, hookParam)->{
                        OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
                        if (stream.equals(serverTimeoutHookParam.getStream_id())) {
            hookSubscribe.addSubscribe(hook,
                    (hookData)->{
                        if (stream.equals(hookData.getStream())) {
                            logger.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
                            // 将信息写入redis中,以备后用
                            redisTemplate.delete(receiveKey);
@@ -126,7 +123,7 @@
                            } catch (IOException e) {
                                logger.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
                            }
                            hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                            hookSubscribe.removeSubscribe(hook);
                        }
                    });
        }
@@ -241,18 +238,18 @@
        }else {
            logger.info("[第三方PS服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
            String uuid = UUID.randomUUID().toString();
            HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
            Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId());
            dynamicTask.startDelay(uuid, ()->{
                logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
                redisTemplate.delete(key);
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                hookSubscribe.removeSubscribe(hook);
            }, 10000);
            // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
            OtherPsSendInfo finalSendInfo = sendInfo;
            hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
            hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
                    (mediaServerItemInUse, response)->{
            hookSubscribe.removeSubscribe(hook);
            hookSubscribe.addSubscribe(hook,
                    (hookData)->{
                        dynamicTask.stop(uuid);
                        logger.info("[第三方PS服务对接->发送流] 流上线,开始发流 callId->{}", callId);
                        try {
@@ -269,7 +266,7 @@
                            logger.info("[第三方PS服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
                            throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
                        }
                        hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                        hookSubscribe.removeSubscribe(hook);
                    });
        }
    }