648540858
2023-05-25 30ae9e929fad80f624ab632c53081db3d2dc9aec
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,12 +1,14 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
@@ -14,6 +16,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -66,6 +69,9 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
@@ -96,6 +102,8 @@
    @Autowired
    private IPlayService playService;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Override
@@ -198,6 +206,7 @@
            // 保存时启用就发送注册
            // 注册成功时由程序直接调用了online方法
            try {
                logger.info("[国标级联] 平台注册 {}", parentPlatform.getDeviceGBId());
                commanderForPlatform.register(parentPlatform, eventResult -> {
                    logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
                }, null);
@@ -349,6 +358,7 @@
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
        if (sendRtpItems != null && sendRtpItems.size() > 0) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>(3);
@@ -420,20 +430,22 @@
            logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
            return;
        }
        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId);
        if (streamInfo != null) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            // 如果zlm不存在这个流,则删除数据即可
            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId());
            MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
            if (mediaServerItemForStreamInfo != null) {
                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream());
                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
                if (!ready) {
                    // 错误存在于redis中的数据
                    redisCatchStorage.stopPlay(streamInfo);
                    inviteStreamService.removeInviteInfo(inviteInfo);
                }else {
                    // 流确实尚在推流,直接回调结果
                    JSONObject json = new JSONObject();
                    json.put("app", streamInfo.getApp());
                    json.put("stream", streamInfo.getStream());
                    json.put("app", inviteInfo.getStreamInfo().getApp());
                    json.put("stream", inviteInfo.getStreamInfo().getStream());
                    hookEvent.response(mediaServerItemForStreamInfo, json);
                    return;
                }
@@ -449,7 +461,11 @@
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true);
        if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
            logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId);
            errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败"));
            SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
            eventResult.statusCode = -1;
            eventResult.msg = "端口监听失败";
            eventResult.type = SipSubscribe.EventResultType.failedToGetPort;
            errorEvent.response(eventResult);
            return;
        }
        logger.info("[国标级联] 语音喊话,发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
@@ -458,7 +474,8 @@
        String timeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(timeOutTaskKey, () -> {
            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
            if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) {
            InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
            if (inviteInfoForBroadcast == null) {
                logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                try {
@@ -501,7 +518,7 @@
                if (!mediaServerItem.isRtpEnable()) {
                    logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
                    if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) {
                        // ssrc 不可用
                        // 释放ssrc
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());