648540858
2023-08-18 b63a89a0a83dfddab0d714bb9aad90114ab9d514
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
@@ -19,16 +20,15 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
@@ -104,6 +104,9 @@
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Qualifier("taskExecutor")
    @Autowired
@@ -219,10 +222,7 @@
        HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
        if (!"rtp".equals(param.getApp())) {
            result.setEnable_audio(true);
        }
        result.setEnable_audio(true);
        taskExecutor.execute(() -> {
            ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
            if (subscribe != null) {
@@ -250,10 +250,10 @@
            // 如果是录像下载就设置视频间隔十秒
            if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.download) {
                result.setMp4_max_second(10);
                result.setEnable_audio(true);
                result.setEnable_mp4(true);
            }
        }
        if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) {
            logger.info("推流时发现尚未设置录像路径,从assist服务中读取");
            JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null);
@@ -272,6 +272,17 @@
                }
            }
        }
        if (param.getApp().equalsIgnoreCase("rtp")) {
            String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey);
            String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS);
            if (otherRtpSendInfo != null || otherPsSendInfo != null) {
                result.setEnable_mp4(true);
            }
        }
        logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result);
        return result;
    }
@@ -282,7 +293,7 @@
    @ResponseBody
    @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
    public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
        System.out.println(JSON.toJSONString(param));
        if (param.isRegist()) {
            logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
        } else {
@@ -304,13 +315,11 @@
            List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
            // TODO 重构此处逻辑
            boolean isPush = false;
            if (param.isRegist()) {
                // 处理流注册的鉴权信息
                if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                        || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                        || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                    isPush = true;
                    StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
                    if (streamAuthorityInfo == null) {
                        streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
@@ -324,7 +333,7 @@
                redisCatchStorage.removeStreamAuthorityInfo(param.getApp(), param.getStream());
            }
            if ("rtsp".equals(param.getSchema())) {
            if ("rtmp".equals(param.getSchema())) {
                // 更新流媒体负载信息
                if (param.isRegist()) {
                    mediaServerService.addCount(param.getMediaServerId());
@@ -363,6 +372,8 @@
                            StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
                                    param.getApp(), param.getStream(), tracks, callId);
                            param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
                            // 如果是拉流代理产生的,不需要写入推流
                            redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
                            if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                                    || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
@@ -386,7 +397,9 @@
                        }
                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                        if (gbStream != null) {
                            eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
                            if (userSetting.isUsePushingAsStatus()) {
                                eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
                            }
                        }
                        if (type != null) {
                            // 发送流变化redis消息
@@ -463,6 +476,13 @@
                            }
                            redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                                    sendRtpItem.getCallId(), sendRtpItem.getStreamId());
                            if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
                                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
                                        sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
                                        sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
                                messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                                redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
                            }
                        }
                    }
                }
@@ -533,7 +553,7 @@
                }
                return ret;
            }
            // 推流具有主动性,暂时不做处理
            // TODO 推流具有主动性,暂时不做处理
//         StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
//         if (streamPushItem != null) {
//            // TODO 发送停止