xiangpei
2024-10-10 5602c458099e38c6c4278334e5cb4f8029c63a50
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -19,22 +18,16 @@
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -89,9 +82,6 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
@@ -117,9 +107,6 @@
    @Autowired
    private ISIPCommanderForPlatform commanderForPlatform;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private SSRCFactory ssrcFactory;
@@ -263,7 +250,7 @@
            );
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
                    device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
                    device.isSsrcCheck(), true, 0, false, !deviceChannel.getHasAudio(), false, device.getStreamModeForParam());
            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
        }
    }
@@ -275,7 +262,6 @@
            logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -289,6 +275,8 @@
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // 释放生成的ssrc,使用上一次申请的
                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
@@ -306,8 +294,7 @@
                }
                String mediaServerId = streamInfo.getMediaServerId();
                MediaServer mediaInfo = mediaServerService.getOne(mediaServerId);
                Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
                Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
                if (ready != null && ready) {
                    callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -325,7 +312,7 @@
            }
        }
        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, false, device.getStreamModeForParam());
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, !channel.getHasAudio(), false, device.getStreamModeForParam());
        if (ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -395,28 +382,15 @@
            }
        }, userSetting.getPlayTimeout());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
        param.put("recv_stream_id", sendRtpItem.getReceiveStream());
        param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
        zlmServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
            if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
                logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                audioEvent.call("失败, " + jsonObject.getString("msg"));
                // 查看是否已经建立了通道,存在则发送bye
                stopTalk(device, channelId);
            }
        });
        try {
            mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000);
        }catch (ControllerException e) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
            audioEvent.call("失败, " + e.getMessage());
            // 查看是否已经建立了通道,存在则发送bye
            stopTalk(device, channelId);
        }
        // 查看设备是否已经在推流
@@ -485,9 +459,11 @@
                     ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
                    InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
                    null);
            if (callback != null) {
                callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
                        InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
                        null);
            }
            return;
        }
        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
@@ -499,8 +475,9 @@
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
            if (callback != null) {
                callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
            }
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                    InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
            return;
@@ -520,8 +497,9 @@
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
                        device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(),
                        ssrcInfo.getPort(), ssrcInfo.getSsrc());
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                if (callback != null) {
                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                }
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
@@ -557,14 +535,19 @@
                // hook响应
                StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
                if (streamInfo == null){
                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                    if (callback != null) {
                        callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                                InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                    }
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                    return;
                }
                callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                if (callback != null) {
                    callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                }
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.SUCCESS.getCode(),
                        InviteErrorCode.SUCCESS.getMsg(),
@@ -584,9 +567,9 @@
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
                if (callback != null) {
                    callback.run(event.statusCode, event.msg, null);
                }
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
@@ -602,9 +585,10 @@
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
            if (callback != null) {
                callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
                        InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
            }
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
@@ -742,13 +726,16 @@
    @Override
    public MediaServer getNewMediaServerItem(Device device) {
        logger.error(device.getMediaServerId(), "wocnm");
        if (device == null) {
            return null;
        }
        MediaServer mediaServerItem;
        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
            logger.error("进的这");
            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        } else {
            logger.error("进的else");
            mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
        }
        if (mediaServerItem == null) {
@@ -766,8 +753,14 @@
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
        }
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            logger.warn("[录像回放] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道:" + channelId);
        }
        MediaServer newMediaServerItem = getNewMediaServerItem(device);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) {
        if ("TCP-ACTIVE".equalsIgnoreCase(device.getStreamMode()) && ! newMediaServerItem.isRtpEnable()) {
            logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
        }
@@ -778,7 +771,7 @@
                .replace(":", "")
                .replace(" ", "");
        String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  false, device.getStreamModeForParam());
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  !channel.getHasAudio(),  false, device.getStreamModeForParam());
        playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
    }
@@ -963,6 +956,10 @@
        if (device == null) {
            return;
        }
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            return;
        }
        MediaServer newMediaServerItem = this.getNewMediaServerItem(device);
        if (newMediaServerItem == null) {
            callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
@@ -971,7 +968,7 @@
            return;
        }
        // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam());
        download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
    }
@@ -1020,6 +1017,7 @@
            dynamicTask.stop(downLoadTimeOutTaskKey);
            callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
                    String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
@@ -1054,13 +1052,12 @@
                            InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                    , inviteInfo.getChannelId(), inviteInfo.getStream());
                            inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                            inviteStreamService.updateInviteInfo(inviteInfoForNew);
                            // 不可以马上移除会导致后续接口拿不到下载地址
                            inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L);
                        };
                        Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                        // 设置过期时间,下载失败时自动处理订阅数据
//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
//                        hookSubscribe.setExpires(expiresInstant);
                        hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
                        subscribe.addSubscribe(hook, hookEventForRecord);
                    });
        } catch (InvalidArgumentException | SipException | ParseException e) {
@@ -1093,19 +1090,9 @@
            logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
            return null;
        }
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
        if (ssrcTransaction == null) {
            logger.warn("[获取下载进度] 下载已结束");
            return null;
        }
        String app = "rtp";
        MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, app, stream);
        if (mediaInfo == null) {
            logger.warn("[获取下载进度] 查询进度失败, 节点Id: {}, {}/{}", mediaServerId, app, stream);
            return null;
        }
        if (mediaInfo.getDuration() == 0) {
        Long duration  = mediaServerService.updateDownloadProcess(mediaServerItem, app, stream);
        if (duration == null || duration == 0) {
            inviteInfo.getStreamInfo().setProgress(0);
        } else {
            String startTime = inviteInfo.getStreamInfo().getStartTime();
@@ -1114,7 +1101,7 @@
            long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
            long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
            BigDecimal currentCount = new BigDecimal(mediaInfo.getDuration());
            BigDecimal currentCount = new BigDecimal(duration);
            BigDecimal totalCount = new BigDecimal((end - start) * 1000);
            BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
            double process = divide.doubleValue();
@@ -1233,7 +1220,7 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播已经开启: {}", channelId);
                    event.call("语音广播已经开启");
@@ -1243,18 +1230,6 @@
                }
            }
        }
//        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
//        if (sendRtpItem != null) {
//            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
//            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
//            if (streamReady) {
//                logger.warn("[语音对讲] 进行中: {}", channelId);
//                event.call("语音对讲进行中");
//                return false;
//            } else {
//                stopTalk(device, channelId);
//            }
//        }
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@@ -1269,7 +1244,7 @@
            dynamicTask.startDelay(key, ()->{
                logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                stopAudioBroadcast(device.getDeviceId(), channelId);
            }, 2000);
            }, 10*1000);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
@@ -1286,7 +1261,7 @@
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播通道使用中: {}", channelId);
                    return true;
@@ -1442,100 +1417,51 @@
    @Override
    public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
        // 开始发流
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost", "__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", is_Udp);
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
        }
        if (mediaInfo == null) {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
            });
        } else {
            // 如果是严格模式,需要关闭端口占用
            JSONObject startSendRtpStreamResult = null;
            if (sendRtpItem.getLocalPort() != 0) {
        if (mediaInfo != null) {
            try {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                    mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
                    mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
                }
            } else {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
                }
                redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
                return;
            }
            if (startSendRtpStreamResult != null) {
                startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
            }
            logger.info("RTP推流成功[ {}/{} ],{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
        }
    }
    @Override
    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
                    sendRtpItem.isTcpActive()?"被动发流": param.get("dst_url") + ":" + param.get("dst_port"));
            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) {
                ParentPlatform platform = (ParentPlatform)correlationInfo;
                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                        sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                        sendRtpItem.getMediaServerId());
                messageForPushChannel.setPlatFormIndex(platform.getId());
                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
    public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, ParentPlatform platform, CallIdHeader callIdHeader) {
        if (sendRtpItem.isOnlyAudio()) {
            Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null) {
                try {
                    cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                } catch (SipException | ParseException | InvalidArgumentException |
                         SsrcTransactionNotFoundException exception) {
                    logger.error("[命令发送失败] 停止语音对讲: {}", exception.getMessage());
                }
            }
        } else {
            logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
                Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                if (audioBroadcastCatch != null) {
                    try {
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                    } catch (SipException | ParseException | InvalidArgumentException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
                    }
                }
            } else {
            if (platform != null) {
                // 向上级平台
                if (correlationInfo instanceof ParentPlatform) {
                    try {
                        ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
                        commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                    }
                try {
                    commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                }
            }
        }
    }
@@ -1558,7 +1484,7 @@
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
                    event.call("正在语音广播");
@@ -1572,7 +1498,7 @@
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
        if (sendRtpItem != null) {
            MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            if (streamReady) {
                logger.warn("[语音对讲] 进行中: {}", channelId);
                event.call("语音对讲进行中");
@@ -1619,12 +1545,7 @@
        MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
        if (streamIsReady == null || streamIsReady) {
            Map<String, Object> param = new HashMap<>();
            param.put("vhost", "__defaultVhost__");
            param.put("app", sendRtpItem.getApp());
            param.put("stream", sendRtpItem.getStream());
            param.put("ssrc", sendRtpItem.getSsrc());
            zlmServerFactory.stopSendRtpStream(mediaServer, param);
            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
        }
        ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
@@ -1687,4 +1608,26 @@
        });
    }
    @Override
    public void stopPlay(Device device, String channelId) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        if (inviteInfo == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
        }
        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
            try {
                logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
            }
        }
        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        storager.stopPlay(device.getDeviceId(), channelId);
        channelService.stopPlay(device.getDeviceId(), channelId);
        if (inviteInfo.getStreamInfo() != null) {
            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
        }
    }
}