648540858
2024-04-30 b6e604f2449bb65dfaafb0f0741ba54ff0d2f9c2
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1,11 +1,8 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
@@ -18,24 +15,21 @@
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -46,9 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
@@ -89,22 +82,13 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    private HookSubscribe subscribe;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private IMediaService mediaService;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -119,42 +103,165 @@
    private IDeviceChannelService channelService;
    @Autowired
    private SipConfig sipConfig;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private CloudRecordServiceMapper cloudRecordServiceMapper;
    @Autowired
    private ISIPCommanderForPlatform commanderForPlatform;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    /**
     * 流到来的处理
     */
    @Async("taskExecutor")
    @org.springframework.context.event.EventListener
    public void onApplicationEvent(MediaArrivalEvent event) {
        if ("broadcast".equals(event.getApp())) {
            if (event.getStream().indexOf("_") > 0) {
                String[] streamArray = event.getStream().split("_");
                if (streamArray.length == 2) {
                    String deviceId = streamArray[0];
                    String channelId = streamArray[1];
                    Device device = deviceService.getDevice(deviceId);
                    if (device == null) {
                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
                        return;
                    }
                    if ("broadcast".equals(event.getApp())) {
                        if (audioBroadcastManager.exit(deviceId, channelId)) {
                            stopAudioBroadcast(deviceId, channelId);
                        }
                        // 开启语音对讲通道
                        try {
                            audioBroadcastCmd(device, channelId, event.getMediaServer(),
                                    event.getApp(), event.getStream(), 60, false, (msg) -> {
                                        logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
                                    });
                        } catch (InvalidArgumentException | ParseException | SipException e) {
                            logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
                        }
                    }else if ("talk".equals(event.getApp())) {
                        // 开启语音对讲通道
                        talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> {
                            logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
                        });
                    }
                }
            }
        }
    }
    /**
     * 流离开的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
        if (!sendRtpItems.isEmpty()) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
                    String platformId = sendRtpItem.getPlatformId();
                    Device device = deviceService.getDevice(platformId);
                    try {
                        if (device != null) {
                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                if (audioBroadcastCatch != null) {
                                    // 来自上级平台的停止对讲
                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                }
                            }
                        }
                    } catch (SipException | InvalidArgumentException | ParseException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                    }
                }
            }
        }
        if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
            if (event.getStream().indexOf("_") > 0) {
                String[] streamArray = event.getStream().split("_");
                if (streamArray.length == 2) {
                    String deviceId = streamArray[0];
                    String channelId = streamArray[1];
                    Device device = deviceService.getDevice(deviceId);
                    if (device == null) {
                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
                        return;
                    }
                    if ("broadcast".equals(event.getApp())) {
                        stopAudioBroadcast(deviceId, channelId);
                    }else if ("talk".equals(event.getApp())) {
                        stopTalk(device, channelId, false);
                    }
                }
            }
        }
    }
    /**
     * 流未找到的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaNotFoundEvent event) {
        if (!"rtp".equals(event.getApp())) {
            return;
        }
        String[] s = event.getStream().split("_");
        if ((s.length != 2 && s.length != 4)) {
            return;
        }
        String deviceId = s[0];
        String channelId = s[1];
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device == null || !device.isOnLine()) {
            return;
        }
        DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
        if (deviceChannel == null) {
            return;
        }
        if (s.length == 2) {
            logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
            play(event.getMediaServer(), deviceId, channelId, null, null);
        } else if (s.length == 4) {
            // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
            String startTimeStr = s[2];
            String endTimeStr = s[3];
            if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
                return;
            }
            String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
            String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
            logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
                    event.getMediaServer().getId(), event.getSchema(),
                    event.getApp(), event.getStream(),
                    startTime, endTime
            );
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
                    device.isSsrcCheck(), true, 0, false, !deviceChannel.getHasAudio(), false, device.getStreamModeForParam());
            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
        }
    }
    @Override
    public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
    public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
        if (mediaServerItem == null) {
            logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -168,6 +275,8 @@
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // 释放生成的ssrc,使用上一次申请的
                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
@@ -184,9 +293,8 @@
                    return inviteInfo.getSsrcInfo();
                }
                String mediaServerId = streamInfo.getMediaServerId();
                MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
                Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
                MediaServer mediaInfo = mediaServerService.getOne(mediaServerId);
                Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
                if (ready != null && ready) {
                    callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -203,8 +311,8 @@
                }
            }
        }
        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, false, device.getStreamModeForParam());
        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, !channel.getHasAudio(), false, device.getStreamModeForParam());
        if (ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -217,8 +325,8 @@
        return ssrcInfo;
    }
    private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream,
                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
    private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream,
                      HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                      Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
        String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@@ -274,38 +382,25 @@
            }
        }, userSetting.getPlayTimeout());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
        param.put("recv_stream_id", sendRtpItem.getReceiveStream());
        param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
        zlmServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
            if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
                logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                audioEvent.call("失败, " + jsonObject.getString("msg"));
                // 查看是否已经建立了通道,存在则发送bye
                stopTalk(device, channelId);
            }
        });
        try {
            mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
        }catch (ControllerException e) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
            audioEvent.call("失败, " + e.getMessage());
            // 查看是否已经建立了通道,存在则发送bye
            stopTalk(device, channelId);
        }
        // 查看设备是否已经在推流
        try {
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam);
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // TODO 暂不做处理
            }, (mediaServerItemInuse, hookParam) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookParam);
            }, (hookData) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
            }, (event) -> {
@@ -360,7 +455,7 @@
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
    public void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
                     ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
@@ -415,8 +510,7 @@
                    streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                    // 取消订阅消息监听
                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                    subscribe.removeSubscribe(hookSubscribe);
                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
                }
            }else {
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
@@ -431,11 +525,11 @@
        }, userSetting.getPlayTimeout());
        try {
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> {
                logger.info("收到订阅消息: " + hookParam);
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
                logger.info("收到订阅消息: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // hook响应
                StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId());
                StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
                if (streamInfo == null){
                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@@ -451,7 +545,7 @@
                        streamInfo);
                logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(),
                        channel.getStreamIdentification());
                snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            }, (eventResult) -> {
                // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
@@ -465,8 +559,7 @@
                streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
                callback.run(event.statusCode, event.msg, null);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
@@ -494,7 +587,7 @@
    }
    private void tcpActiveHandler(Device device, String channelId, String contentString,
                                  MediaServerItem mediaServerItem,
                                  MediaServer mediaServerItem,
                                  String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
        if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
            return;
@@ -521,9 +614,9 @@
                }
            }
            logger.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
            JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
            logger.info("[TCP主动连接对方] 结果: {}" , jsonObject);
            if (jsonObject.getInteger("code") != 0) {
            Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
            logger.info("[TCP主动连接对方] 结果: {}" , result);
            if (!result) {
                // 主动连接失败,结束流程, 清理数据
                dynamicTask.stop(timeOutTaskKey);
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@@ -563,7 +656,7 @@
     * @param channelId            通道 ID
     * @param stream               ssrc
     */
    private void snapOnPlay(MediaServerItem mediaServerItemInuse, String deviceId, String channelId, String stream) {
    private void snapOnPlay(MediaServer mediaServerItemInuse, String deviceId, String channelId, String stream) {
        String streamUrl;
        if (mediaServerItemInuse.getRtspPort() != 0) {
            streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
@@ -574,14 +667,13 @@
        String fileName = deviceId + "_" + channelId + ".jpg";
        // 请求截图
        logger.info("[请求截图]: " + fileName);
        zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
        mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
    }
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = null;
        Device device = redisCatchStorage.getDevice(deviceId);
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
        streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
        streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -599,9 +691,8 @@
    }
    private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setStartTime(startTime);
            streamInfo.setEndTime(endTime);
@@ -610,7 +701,7 @@
                deviceChannel.setStreamId(streamInfo.getStream());
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, ((OnStreamChangedHookParam) param).getStream());
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream());
            if (inviteInfo != null) {
                inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -623,11 +714,11 @@
    }
    @Override
    public MediaServerItem getNewMediaServerItem(Device device) {
    public MediaServer getNewMediaServerItem(Device device) {
        if (device == null) {
            return null;
        }
        MediaServerItem mediaServerItem;
        MediaServer mediaServerItem;
        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        } else {
@@ -648,7 +739,13 @@
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            logger.warn("[录像回放] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道:" + channelId);
        }
        MediaServer newMediaServerItem = getNewMediaServerItem(device);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) {
            logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
@@ -660,12 +757,12 @@
                .replace(":", "")
                .replace(" ", "");
        String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  false, device.getStreamModeForParam());
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  !channel.getHasAudio(),  false, device.getStreamModeForParam());
        playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
    }
    @Override
    public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
    public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo,
                         String deviceId, String channelId, String startTime,
                         String endTime, ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
@@ -716,10 +813,10 @@
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
            logger.info("收到回放订阅消息: " + hookParam);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("收到回放订阅消息: " + hookData);
            dynamicTask.stop(playBackTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -749,7 +846,7 @@
    }
    private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServerItem mediaServerItem,
    private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServer mediaServerItem,
                                 Device device, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
                                 InviteInfo inviteInfo, InviteSessionType inviteSessionType){
        inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -845,7 +942,11 @@
        if (device == null) {
            return;
        }
        MediaServerItem newMediaServerItem = this.getNewMediaServerItem(device);
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            return;
        }
        MediaServer newMediaServerItem = this.getNewMediaServerItem(device);
        if (newMediaServerItem == null) {
            callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
                    InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
@@ -853,13 +954,13 @@
            return;
        }
        // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam());
        download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
    }
    @Override
    public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
    public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
                    InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@@ -905,10 +1006,10 @@
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
            logger.info("[录像下载]收到订阅消息: " + hookParam);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("[录像下载]收到订阅消息: " + hookData);
            dynamicTask.stop(downLoadTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("[录像下载] 获取流地址信息失败");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -926,26 +1027,22 @@
                                downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
                        // 注册录像回调事件,录像下载结束后写入下载地址
                        ZlmHttpHookSubscribe.Event hookEventForRecord = (mediaServerItemInuse, hookParam) -> {
                        HookSubscribe.Event hookEventForRecord = (hookData) -> {
                            logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}",
                                    inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookParam);
                            OnRecordMp4HookParam recordMp4HookParam = (OnRecordMp4HookParam)hookParam;
                            String filePath = recordMp4HookParam.getFile_path();
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData);
                            RecordInfo recordInfo = hookData.getRecordInfo();
                            String filePath = recordInfo.getFilePath();
                            DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
                            InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                    , inviteInfo.getChannelId(), inviteInfo.getStream());
                            inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                            inviteStreamService.updateInviteInfo(inviteInfoForNew);
                        };
                        HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory.on_record_mp4(
                                mediaServerItem.getId(), "rtp", ssrcInfo.getStream());
                        Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                        // 设置过期时间,下载失败时自动处理订阅数据
//                        long difference = DateUtil.getDifference(startTime, endTime)/1000;
//                        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
//                        hookSubscribe.setExpires(expiresInstant);
                        subscribe.addSubscribe(hookSubscribe, hookEventForRecord);
                        hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
                        subscribe.addSubscribe(hook, hookEventForRecord);
                    });
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
@@ -972,7 +1069,7 @@
        // 获取当前已下载时长
        String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
        MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
            return null;
@@ -983,30 +1080,13 @@
            logger.warn("[获取下载进度] 下载已结束");
            return null;
        }
        JSONObject mediaListJson= zlmresTfulUtils.getMediaList(mediaServerItem, "rtp", stream);
        if (mediaListJson == null) {
            logger.warn("[获取下载进度] 从zlm查询进度失败");
        String app = "rtp";
        MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, app, stream);
        if (mediaInfo == null) {
            logger.warn("[获取下载进度] 查询进度失败, 节点Id: {}, {}/{}", mediaServerId, app, stream);
            return null;
        }
        if (mediaListJson.getInteger("code") != 0) {
            logger.warn("[获取下载进度] 从zlm查询进度出现错误: {}", mediaListJson.getString("msg"));
            return null;
        }
        JSONArray data = mediaListJson.getJSONArray("data");
        if (data == null) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject mediaJSON = data.getJSONObject(0);
        JSONArray tracks = mediaJSON.getJSONArray("tracks");
        if (tracks.isEmpty()) {
            logger.warn("[获取下载进度] 从zlm查询进度时未返回数据");
            return null;
        }
        JSONObject jsonObject = tracks.getJSONObject(0);
        long duration = jsonObject.getLongValue("duration");
        if (duration == 0) {
        if (mediaInfo.getDuration() == 0) {
            inviteInfo.getStreamInfo().setProgress(0);
        } else {
            String startTime = inviteInfo.getStreamInfo().getStartTime();
@@ -1015,7 +1095,7 @@
            long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
            long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
            BigDecimal currentCount = new BigDecimal(duration);
            BigDecimal currentCount = new BigDecimal(mediaInfo.getDuration());
            BigDecimal totalCount = new BigDecimal((end - start) * 1000);
            BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
            double process = divide.doubleValue();
@@ -1028,9 +1108,8 @@
        return inviteInfo.getStreamInfo();
    }
    private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
        OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setProgress(0);
            streamInfo.setStartTime(startTime);
@@ -1047,8 +1126,8 @@
    }
    public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
    public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
        streamInfo.setDeviceID(deviceId);
        streamInfo.setChannelId(channelId);
        return streamInfo;
@@ -1104,7 +1183,7 @@
            logger.warn("开启语音广播的时候未找到通道: {}", channelId);
            return null;
        }
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        MediaServer mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        if (broadcastMode == null) {
            broadcastMode = true;
        }
@@ -1113,13 +1192,13 @@
        AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
        audioBroadcastResult.setApp(app);
        audioBroadcastResult.setStream(stream);
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setCodec("G.711");
        return audioBroadcastResult;
    }
    @Override
    public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
    public boolean audioBroadcastCmd(Device device, String channelId, MediaServer mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
        if (device == null || channelId == null) {
            return false;
        }
@@ -1135,7 +1214,7 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播已经开启: {}", channelId);
                    event.call("语音广播已经开启");
@@ -1145,18 +1224,6 @@
                }
            }
        }
//        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
//        if (sendRtpItem != null) {
//            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
//            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
//            if (streamReady) {
//                logger.warn("[语音对讲] 进行中: {}", channelId);
//                event.call("语音对讲进行中");
//                return false;
//            } else {
//                stopTalk(device, channelId);
//            }
//        }
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@@ -1171,7 +1238,7 @@
            dynamicTask.startDelay(key, ()->{
                logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                stopAudioBroadcast(device.getDeviceId(), channelId);
            }, 2000);
            }, 10*1000);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
@@ -1187,8 +1254,8 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播通道使用中: {}", channelId);
                    return true;
@@ -1217,12 +1284,8 @@
                SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
                if (sendRtpItem != null) {
                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    Map<String, Object> param = new HashMap<>();
                    param.put("vhost", "__defaultVhost__");
                    param.put("app", sendRtpItem.getApp());
                    param.put("stream", sendRtpItem.getStream());
                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
                    MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
                    try {
                        cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                    } catch (InvalidArgumentException | ParseException | SipException |
@@ -1298,7 +1361,7 @@
        }
        inviteInfo.getStreamInfo().setPause(true);
        inviteStreamService.updateInviteInfo(inviteInfo);
        MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        if (null == mediaServerItem) {
            logger.warn("mediaServer 不存在!");
            throw new ServiceException("mediaServer不存在");
@@ -1309,8 +1372,8 @@
        if (!mediaServerItem.isRtpEnable()) {
            streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase();
        }
        JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamKey);
        if (jsonObject == null || jsonObject.getInteger("code") != 0) {
        Boolean result = mediaServerService.pauseRtpCheck(mediaServerItem, streamKey);
        if (!result) {
            throw new ServiceException("暂停RTP接收失败");
        }
        Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
@@ -1326,7 +1389,7 @@
        }
        inviteInfo.getStreamInfo().setPause(false);
        inviteStreamService.updateInviteInfo(inviteInfo);
        MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        if (null == mediaServerItem) {
            logger.warn("mediaServer 不存在!");
            throw new ServiceException("mediaServer不存在");
@@ -1337,8 +1400,8 @@
        if (!mediaServerItem.isRtpEnable()) {
            streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase();
        }
        JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamKey);
        if (jsonObject == null || jsonObject.getInteger("code") != 0) {
        boolean result = mediaServerService.resumeRtpCheck(mediaServerItem, streamKey);
        if (!result) {
            throw new ServiceException("继续RTP接收失败");
        }
        Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
@@ -1348,105 +1411,55 @@
    @Override
    public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
        // 开始发流
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost", "__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", is_Udp);
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
        }
        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
            });
        } else {
            // 如果是严格模式,需要关闭端口占用
            JSONObject startSendRtpStreamResult = null;
            if (sendRtpItem.getLocalPort() != 0) {
        if (mediaInfo != null) {
            try {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                }
            } else {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
                }
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
                return;
            }
            if (startSendRtpStreamResult != null) {
                startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
            }
            logger.info("RTP推流成功[ {}/{} ],{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
        }
    }
    @Override
    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo,
                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
                    sendRtpItem.isTcpActive()?"被动发流": param.get("dst_url") + ":" + param.get("dst_port"));
            if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) {
                ParentPlatform platform = (ParentPlatform)correlationInfo;
                MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                        sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                        sendRtpItem.getMediaServerId());
                messageForPushChannel.setPlatFormIndex(platform.getId());
                redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
    public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, ParentPlatform platform, CallIdHeader callIdHeader) {
        if (sendRtpItem.isOnlyAudio()) {
            Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null) {
                try {
                    cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                } catch (SipException | ParseException | InvalidArgumentException |
                         SsrcTransactionNotFoundException exception) {
                    logger.error("[命令发送失败] 停止语音对讲: {}", exception.getMessage());
                }
            }
        } else {
            logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
                Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                if (audioBroadcastCatch != null) {
                    try {
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                    } catch (SipException | ParseException | InvalidArgumentException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
                    }
                }
            } else {
            if (platform != null) {
                // 向上级平台
                if (correlationInfo instanceof ParentPlatform) {
                    try {
                        ParentPlatform parentPlatform = (ParentPlatform)correlationInfo;
                        commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                    }
                try {
                    commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                }
            }
        }
    }
    @Override
    public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event) {
    public void talkCmd(Device device, String channelId, MediaServer mediaServerItem, String stream, AudioBroadcastEvent event) {
        if (device == null || channelId == null) {
            return;
        }
@@ -1463,8 +1476,8 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
                    event.call("正在语音广播");
@@ -1477,8 +1490,8 @@
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
        if (sendRtpItem != null) {
            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            if (streamReady) {
                logger.warn("[语音对讲] 进行中: {}", channelId);
                event.call("语音对讲进行中");
@@ -1488,7 +1501,7 @@
            }
        }
        talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
        talk(mediaServerItem, device, channelId, stream, (hookData) -> {
            logger.info("[语音对讲] 收到设备发来的流");
        }, eventResult -> {
            logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
@@ -1522,15 +1535,10 @@
            return;
        }
        MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
        MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
        if (streamIsReady == null || streamIsReady) {
            Map<String, Object> param = new HashMap<>();
            param.put("vhost", "__defaultVhost__");
            param.put("app", sendRtpItem.getApp());
            param.put("stream", sendRtpItem.getStream());
            param.put("ssrc", sendRtpItem.getSsrc());
            zlmServerFactory.stopSendRtpStream(mediaServer, param);
            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
        }
        ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
@@ -1557,7 +1565,7 @@
        if (inviteInfo != null) {
            if (inviteInfo.getStreamInfo() != null) {
                // 已存在线直接截图
                MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
                MediaServer mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
                String streamUrl;
                if (mediaServerItemInuse.getRtspPort() != 0) {
                    streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  inviteInfo.getStreamInfo().getStream());
@@ -1567,7 +1575,7 @@
                String path = "snap";
                // 请求截图
                logger.info("[请求截图]: " + fileName);
                zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
                mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
                File snapFile = new File(path + File.separator + fileName);
                if (snapFile.exists()) {
                    errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
@@ -1578,7 +1586,7 @@
            }
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
        MediaServer newMediaServerItem = getNewMediaServerItem(device);
        play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
            if (code == InviteErrorCode.SUCCESS.getCode()) {
                InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
@@ -1593,4 +1601,26 @@
        });
    }
    @Override
    public void stopPlay(Device device, String channelId) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        if (inviteInfo == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
        }
        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
            try {
                logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
            }
        }
        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        storager.stopPlay(device.getDeviceId(), channelId);
        channelService.stopPlay(device.getDeviceId(), channelId);
        if (inviteInfo.getStreamInfo() != null) {
            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
        }
    }
}