648540858
2024-04-28 1fc2916c2b4b28fbf722c4401e559805f9578573
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
old mode 100644 new mode 100755
@@ -1,13 +1,8 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
@@ -17,26 +12,26 @@
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -46,9 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
@@ -57,6 +51,7 @@
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import java.io.File;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
@@ -64,6 +59,7 @@
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@DS("master")
public class PlayServiceImpl implements IPlayService {
    private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
@@ -72,7 +68,7 @@
    private IVideoManagerStorage storager;
    @Autowired
    private SIPCommander cmder;
    private ISIPCommander cmder;
    @Autowired
    private AudioBroadcastManager audioBroadcastManager;
@@ -87,22 +83,13 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private DeferredResultHolder resultHolder;
    private HookSubscribe subscribe;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private AssistRESTfulUtils assistRESTfulUtils;
    @Autowired
    private IMediaService mediaService;
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -114,48 +101,188 @@
    private UserSetting userSetting;
    @Autowired
    private SipConfig sipConfig;
    private IDeviceChannelService channelService;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
    @Autowired
    private ISIPCommanderForPlatform commanderForPlatform;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    /**
     * 流到来的处理
     */
    @Async("taskExecutor")
    @org.springframework.context.event.EventListener
    public void onApplicationEvent(MediaArrivalEvent event) {
        if ("broadcast".equals(event.getApp())) {
            if (event.getStream().indexOf("_") > 0) {
                String[] streamArray = event.getStream().split("_");
                if (streamArray.length == 2) {
                    String deviceId = streamArray[0];
                    String channelId = streamArray[1];
                    Device device = deviceService.getDevice(deviceId);
                    if (device == null) {
                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
                        return;
                    }
                    if ("broadcast".equals(event.getApp())) {
                        if (audioBroadcastManager.exit(deviceId, channelId)) {
                            stopAudioBroadcast(deviceId, channelId);
                        }
                        // 开启语音对讲通道
                        try {
                            audioBroadcastCmd(device, channelId, event.getMediaServer(),
                                    event.getApp(), event.getStream(), 60, false, (msg) -> {
                                        logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
                                    });
                        } catch (InvalidArgumentException | ParseException | SipException e) {
                            logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
                        }
                    }else if ("talk".equals(event.getApp())) {
                        // 开启语音对讲通道
                        talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> {
                            logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
                        });
                    }
                }
            }
        }
    }
    /**
     * 流离开的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
        if (!sendRtpItems.isEmpty()) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
                    String platformId = sendRtpItem.getPlatformId();
                    Device device = deviceService.getDevice(platformId);
                    try {
                        if (device != null) {
                            cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
                            if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
                                    || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
                                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                if (audioBroadcastCatch != null) {
                                    // 来自上级平台的停止对讲
                                    logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                    audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                                }
                            }
                        }
                    } catch (SipException | InvalidArgumentException | ParseException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                    }
                }
            }
        }
        if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
            if (event.getStream().indexOf("_") > 0) {
                String[] streamArray = event.getStream().split("_");
                if (streamArray.length == 2) {
                    String deviceId = streamArray[0];
                    String channelId = streamArray[1];
                    Device device = deviceService.getDevice(deviceId);
                    if (device == null) {
                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
                        return;
                    }
                    if ("broadcast".equals(event.getApp())) {
                        stopAudioBroadcast(deviceId, channelId);
                    }else if ("talk".equals(event.getApp())) {
                        stopTalk(device, channelId, false);
                    }
                }
            }
        }
    }
    /**
     * 流未找到的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaNotFoundEvent event) {
        if (!"rtp".equals(event.getApp())) {
            return;
        }
        String[] s = event.getStream().split("_");
        if ((s.length != 2 && s.length != 4)) {
            return;
        }
        String deviceId = s[0];
        String channelId = s[1];
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device == null || !device.isOnLine()) {
            return;
        }
        DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
        if (deviceChannel == null) {
            return;
        }
        if (s.length == 2) {
            logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", event.getMediaServer().getId(), event.getSchema(), event.getApp(), event.getStream());
            play(event.getMediaServer(), deviceId, channelId, null, null);
        } else if (s.length == 4) {
            // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间
            String startTimeStr = s[2];
            String endTimeStr = s[3];
            if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) {
                return;
            }
            String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr);
            String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr);
            logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}",
                    event.getMediaServer().getId(), event.getSchema(),
                    event.getApp(), event.getStream(),
                    startTime, endTime
            );
            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null,
                    device.isSsrcCheck(), true, 0, false, !deviceChannel.isHasAudio(), false, device.getStreamModeForParam());
            playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null);
        }
    }
    @Override
    public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback) {
    public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
        if (mediaServerItem == null) {
            logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
        }
        Device device = redisCatchStorage.getDevice(deviceId);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
            logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
        }
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            logger.warn("[点播] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道");
        }
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null ) {
            if (inviteInfo.getStreamInfo() == null) {
                // 点播发起了但是尚未成功, 仅注册回调等待结果即可
                inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
                logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                return inviteInfo.getSsrcInfo();
            }else {
                StreamInfo streamInfo = inviteInfo.getStreamInfo();
@@ -169,15 +296,15 @@
                    return inviteInfo.getSsrcInfo();
                }
                String mediaServerId = streamInfo.getMediaServerId();
                MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
                Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
                MediaServer mediaInfo = mediaServerService.getOne(mediaServerId);
                Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
                if (ready != null && ready) {
                    callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                            InviteErrorCode.SUCCESS.getCode(),
                            InviteErrorCode.SUCCESS.getMsg(),
                            streamInfo);
                    logger.info("[点播已存在] 直接返回, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                    return inviteInfo.getSsrcInfo();
                }else {
                    // 点播发起了但是尚未成功, 仅注册回调等待结果即可
@@ -187,12 +314,8 @@
                }
            }
        }
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        }
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(),  false, 0,  false, false, device.getStreamModeForParam());
        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(),  false, 0, false, !channel.isHasAudio(), false, device.getStreamModeForParam());
        if (ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@@ -201,13 +324,12 @@
                    null);
            return null;
        }
        // TODO 记录点播的状态
        play(mediaServerItem, ssrcInfo, device, channelId, callback);
        play(mediaServerItem, ssrcInfo, device, channel, callback);
        return ssrcInfo;
    }
    private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream,
                      ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
    private void talk(MediaServer mediaServerItem, Device device, String channelId, String stream,
                      HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
                      Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
        String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@@ -234,8 +356,8 @@
        sendRtpItem.setUsePs(false);
        sendRtpItem.setReceiveStream(stream + "_talk");
        int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc);
        String callId = SipUtils.getNewCallId();
        int port = sendRtpPortManager.getNextPort(mediaServerItem);
        //端口获取失败的ssrcInfo 没有必要发送点播指令
        if (port <= 0) {
            logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId);
@@ -263,41 +385,25 @@
            }
        }, userSetting.getPlayTimeout());
        String callId = SipUtils.getNewCallId();
        zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
        param.put("recv_stream_id", sendRtpItem.getReceiveStream());
        param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
        zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
            if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
                mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
                logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                audioEvent.call("失败, " + jsonObject.getString("msg"));
                // 查看是否已经建立了通道,存在则发送bye
                stopTalk(device, channelId);
            }
        });
        try {
            mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
        }catch (ControllerException e) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
            audioEvent.call("失败, " + e.getMessage());
            // 查看是否已经建立了通道,存在则发送bye
            stopTalk(device, channelId);
        }
        // 查看设备是否已经在推流
        try {
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
            cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (hookData) -> {
                logger.info("[语音对讲] 流已生成, 开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // TODO 暂不做处理
            }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
                logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
            }, (hookData) -> {
                logger.info("[语音对讲] 设备开始推流: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
            }, (event) -> {
@@ -352,8 +458,8 @@
    @Override
    public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
                     InviteErrorCallback<Object> callback) {
    public void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel,
                     ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
@@ -361,23 +467,24 @@
                    null);
            return;
        }
        logger.info("[点播开始] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
        logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
                device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(),
                device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
        //端口获取失败的ssrcInfo 没有必要发送点播指令
        if (ssrcInfo.getPort() <= 0) {
            logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
            logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getChannelId(), ssrcInfo);
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                    InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
            return;
        }
        // 初始化redis中的invite消息状态
        InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
        InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream(), ssrcInfo,
                mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
                InviteSessionStatus.ready);
        inviteStreamService.updateInviteInfo(inviteInfo);
@@ -385,219 +492,82 @@
        String timeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(timeOutTaskKey, () -> {
            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
            InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
            InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
            if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
//                InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
//                if (inviteInfoForTimeout == null) {
//                    return;
//                }
//                if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
//                    // TODO 发送bye
//                }else {
//                    // TODO 发送cancel
//                }
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                        InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
                        device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(),
                        ssrcInfo.getPort(), ssrcInfo.getSsrc());
                inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
                inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
                try {
                    cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
                } catch (InvalidArgumentException | ParseException | SipException |
                         SsrcTransactionNotFoundException e) {
                    cmder.streamByeCmd(device, channel.getChannelId(), ssrcInfo.getStream(), null);
                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
                    logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
                } finally {
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                    streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                    // 取消订阅消息监听
                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                    subscribe.removeSubscribe(hookSubscribe);
                    subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()));
                }
            }else {
                logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
                        device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(),
                        ssrcInfo.getPort(), ssrcInfo.getSsrc());
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream());
                streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            }
        }, userSetting.getPlayTimeout());
        try {
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
                logger.info("收到订阅消息: " + response.toJSONString());
            cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> {
                logger.info("收到订阅消息: " + hookData);
                dynamicTask.stop(timeOutTaskKey);
                // hook响应
                StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
                StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device.getDeviceId(), channel.getChannelId());
                if (streamInfo == null){
                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                    return;
                }
                callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.SUCCESS.getCode(),
                        InviteErrorCode.SUCCESS.getMsg(),
                        streamInfo);
                logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
                String streamUrl;
                if (mediaServerItemInuse.getRtspPort() != 0) {
                    streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  ssrcInfo.getStream());
                }else {
                    streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp",  ssrcInfo.getStream());
                }
                String path = "snap";
                String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
                // 请求截图
                logger.info("[请求截图]: " + fileName);
                zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
                logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(),
                        channel.getStreamIdentification());
                snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            }, (eventResult) -> {
                // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(),
                        timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY);
            }, (event) -> {
                inviteInfo.setStatus(InviteSessionStatus.ok);
                ResponseEvent responseEvent = (ResponseEvent) event.event;
                String contentString = new String(responseEvent.getResponse().getRawContent());
                // 获取ssrc
                int ssrcIndex = contentString.indexOf("y=");
                // 检查是否有y字段
                if (ssrcIndex >= 0) {
                    //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
                    String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
                    // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
                    if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
                        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                            String substring = contentString.substring(0, contentString.indexOf("y="));
                            try {
                                SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
                                int port = -1;
                                Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                                for (Object description : mediaDescriptions) {
                                    MediaDescription mediaDescription = (MediaDescription) description;
                                    Media media = mediaDescription.getMedia();
                                    Vector mediaFormats = media.getMediaFormats(false);
                                    if (mediaFormats.contains("96")) {
                                        port = media.getMediaPort();
                                        break;
                                    }
                                }
                                logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
                                JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
                                logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
                            } catch (SdpException e) {
                                logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
                                dynamicTask.stop(timeOutTaskKey);
                                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                // 释放ssrc
                                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                            }
                        }
                        return;
                    }
                    logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                    if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
                        logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                        if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
                            // ssrc 不可用
                            logger.info("[点播消息] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                            // 释放ssrc
                            ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                            callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
                                    InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
                            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                    InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
                                    InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
                            return;
                        }
                        // 单端口模式streamId也有变化,重新设置监听即可
                        if (!mediaServerItem.isRtpEnable()) {
                            // 添加订阅
                            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                            subscribe.removeSubscribe(hookSubscribe);
                            String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
                            hookSubscribe.getContent().put("stream", stream);
                            inviteInfo.setStream(stream);
                            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                                logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                dynamicTask.stop(timeOutTaskKey);
                                // hook响应
                                StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
                                if (streamInfo == null){
                                    callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                                    inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
                                            InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
                                    return;
                                }
                                callback.run(InviteErrorCode.SUCCESS.getCode(),
                                        InviteErrorCode.SUCCESS.getMsg(), null);
                                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                        InviteErrorCode.SUCCESS.getCode(),
                                        InviteErrorCode.SUCCESS.getMsg(),
                                        streamInfo);
                            });
                            return;
                        }
                        // 更新ssrc
                        Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
                        if (!result) {
                            try {
                                logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
                                cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
                            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                                logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                            }
                            dynamicTask.stop(timeOutTaskKey);
                            // 释放ssrc
                            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                            callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                    "下级自定义了ssrc,重新设置收流信息失败", null);
                            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                    InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                    "下级自定义了ssrc,重新设置收流信息失败", null);
                        }else {
                            ssrcInfo.setSsrc(ssrcInResponse);
                            inviteInfo.setSsrcInfo(ssrcInfo);
                            inviteInfo.setStream(ssrcInfo.getStream());
                        }
                    }else {
                        logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
                    }
                }
                inviteStreamService.updateInviteInfo(inviteInfo);
            }, (event) -> {
                logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getChannelId(), event.statusCode, event.msg);
                dynamicTask.stop(timeOutTaskKey);
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
                callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                callback.run(event.statusCode, event.msg, null);
                inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                        InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                        String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
                inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
                inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
            });
        } catch (InvalidArgumentException | SipException | ParseException e) {
@@ -607,21 +577,106 @@
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
            inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
                    InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
            inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
            inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
        }
    }
    @Override
    public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
    private void tcpActiveHandler(Device device, String channelId, String contentString,
                                  MediaServer mediaServerItem,
                                  String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
        if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
            return;
        }
        String substring;
        if (contentString.indexOf("y=") > 0) {
            substring = contentString.substring(0, contentString.indexOf("y="));
        }else {
            substring = contentString;
        }
        try {
            SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
            int port = -1;
            Vector mediaDescriptions = sdp.getMediaDescriptions(true);
            for (Object description : mediaDescriptions) {
                MediaDescription mediaDescription = (MediaDescription) description;
                Media media = mediaDescription.getMedia();
                Vector mediaFormats = media.getMediaFormats(false);
                if (mediaFormats.contains("96")) {
                    port = media.getMediaPort();
                    break;
                }
            }
            logger.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
            Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
            logger.info("[TCP主动连接对方] 结果: {}" , result);
            if (!result) {
                // 主动连接失败,结束流程, 清理数据
                dynamicTask.stop(timeOutTaskKey);
                mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                inviteStreamService.call(InviteSessionType.BROADCAST, device.getDeviceId(), channelId, null,
                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                        InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
            }
        } catch (SdpException e) {
            logger.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
            dynamicTask.stop(timeOutTaskKey);
            mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
            inviteStreamService.call(InviteSessionType.BROADCAST, device.getDeviceId(), channelId, null,
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
        }
    }
    /**
     * 点播成功时调用截图.
     *
     * @param mediaServerItemInuse media
     * @param deviceId             设备 ID
     * @param channelId            通道 ID
     * @param stream               ssrc
     */
    private void snapOnPlay(MediaServer mediaServerItemInuse, String deviceId, String channelId, String stream) {
        String streamUrl;
        if (mediaServerItemInuse.getRtspPort() != 0) {
            streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
        } else {
            streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", stream);
        }
        String path = "snap";
        String fileName = deviceId + "_" + channelId + ".jpg";
        // 请求截图
        logger.info("[请求截图]: " + fileName);
        mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
    }
    public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = null;
        Device device = redisCatchStorage.getDevice(deviceId);
        streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
            if (deviceChannel != null) {
@@ -639,9 +694,8 @@
    }
    private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
    private StreamInfo onPublishHandlerForPlayback(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItem, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setStartTime(startTime);
            streamInfo.setEndTime(endTime);
@@ -650,7 +704,7 @@
                deviceChannel.setStreamId(streamInfo.getStream());
                storager.startPlay(deviceId, channelId, streamInfo.getStream());
            }
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, mediaInfo.getStream());
            if (inviteInfo != null) {
                inviteInfo.setStatus(InviteSessionStatus.ok);
@@ -663,11 +717,11 @@
    }
    @Override
    public MediaServerItem getNewMediaServerItem(Device device) {
    public MediaServer getNewMediaServerItem(Device device) {
        if (device == null) {
            return null;
        }
        MediaServerItem mediaServerItem;
        MediaServer mediaServerItem;
        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        } else {
@@ -680,38 +734,40 @@
    }
    @Override
    public MediaServerItem getNewMediaServerItemHasAssist(Device device) {
        if (device == null) {
            return null;
        }
        MediaServerItem mediaServerItem;
        if (ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
            mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(true);
        } else {
            mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
        }
        if (mediaServerItem == null) {
            logger.warn("[获取可用的ZLM节点]未找到可使用的ZLM...");
        }
        return mediaServerItem;
    }
    @Override
    public void playBack(String deviceId, String channelId, String startTime,
                                                          String endTime, InviteErrorCallback<Object> callback) {
                         String endTime, ErrorCallback<Object> callback) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            return;
            logger.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId);
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            logger.warn("[录像回放] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道:" + channelId);
        }
        MediaServer newMediaServerItem = getNewMediaServerItem(device);
        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) {
            logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流");
        }
        String startTimeStr = startTime.replace("-", "")
                .replace(":", "")
                .replace(" ", "");
        String endTimeTimeStr = endTime.replace("-", "")
                .replace(":", "")
                .replace(" ", "");
        String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr;
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(),  true, 0, false,  !channel.isHasAudio(),  false, device.getStreamModeForParam());
        playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback);
    }
    @Override
    public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
                                                          String deviceId, String channelId, String startTime,
                                                          String endTime, InviteErrorCallback<Object> callback) {
    public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo,
                         String deviceId, String channelId, String startTime,
                         String endTime, ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
                    InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@@ -727,7 +783,7 @@
                device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
                ssrcInfo.getSsrc(), device.isSsrcCheck());
        // 初始化redis中的invite消息状态
        InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
        InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
                mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
                InviteSessionStatus.ready);
        inviteStreamService.updateInviteInfo(inviteInfo);
@@ -760,10 +816,10 @@
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
            logger.info("收到回放订阅消息: " + jsonObject);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("收到回放订阅消息: " + hookData);
            dynamicTask.stop(playBackTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("设备回放API调用失败!");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -777,118 +833,12 @@
        try {
            cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
                    hookEvent, eventResult -> {
                        inviteInfo.setStatus(InviteSessionStatus.ok);
                        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
                        String contentString = new String(responseEvent.getResponse().getRawContent());
                        // 获取ssrc
                        int ssrcIndex = contentString.indexOf("y=");
                        // 检查是否有y字段
                        if (ssrcIndex >= 0) {
                            //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
                            String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                            // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
                            if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
                                if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                                    String substring = contentString.substring(0, contentString.indexOf("y="));
                                    try {
                                        SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
                                        int port = -1;
                                        Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                                        for (Object description : mediaDescriptions) {
                                            MediaDescription mediaDescription = (MediaDescription) description;
                                            Media media = mediaDescription.getMedia();
                                            Vector mediaFormats = media.getMediaFormats(false);
                                            if (mediaFormats.contains("96")) {
                                                port = media.getMediaPort();
                                                break;
                                            }
                                        }
                                        logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
                                        JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
                                        logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject);
                                    } catch (SdpException e) {
                                        logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
                                        dynamicTask.stop(playBackTimeOutTaskKey);
                                        mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                        // 释放ssrc
                                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                        callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                                        inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                                    }
                                }
                                return;
                            }
                            logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                            if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
                                logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                                if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
                                    // ssrc 不可用
                                    logger.info("[录像回放] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                                    // 释放ssrc
                                    dynamicTask.stop(playBackTimeOutTaskKey);
                                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                    callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
                                            InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
                                    return;
                                }
                                // 单端口模式streamId也有变化,需要重新设置监听
                                if (!mediaServerItem.isRtpEnable()) {
                                    // 添加订阅
                                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                                    subscribe.removeSubscribe(hookSubscribe);
                                    String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
                                    hookSubscribe.getContent().put("stream", stream);
                                    inviteInfo.setStream(stream);
                                    subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                                        logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                        dynamicTask.stop(playBackTimeOutTaskKey);
                                        // hook响应
                                        hookEvent.response(mediaServerItemInUse, response);
                                    });
                                }
                                // 更新ssrc
                                Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
                                if (!result) {
                                    try {
                                        logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
                                        cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
                                    } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                                        logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                                    }
                                    dynamicTask.stop(playBackTimeOutTaskKey);
                                    // 释放ssrc
                                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                    callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                            "下级自定义了ssrc,重新设置收流信息失败", null);
                                }else {
                                    ssrcInfo.setSsrc(ssrcInResponse);
                                    inviteInfo.setSsrcInfo(ssrcInfo);
                                    inviteInfo.setStream(ssrcInfo.getStream());
                                }
                            }else {
                                logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
                            }
                        }
                        inviteStreamService.updateInviteInfo(inviteInfo);
                        // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                        InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
                                playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK);
                    }, errorEvent);
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 回放: {}", e.getMessage());
            logger.error("[命令发送失败] 录像回放: {}", e.getMessage());
            SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
            eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
@@ -899,26 +849,121 @@
    }
    private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServer mediaServerItem,
                                 Device device, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
                                 InviteInfo inviteInfo, InviteSessionType inviteSessionType){
        inviteInfo.setStatus(InviteSessionStatus.ok);
        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
        String contentString = new String(responseEvent.getResponse().getRawContent());
        String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
        // 兼容回复的消息中缺少ssrc(y字段)的情况
        if (ssrcInResponse == null) {
            ssrcInResponse = ssrcInfo.getSsrc();
        }
        if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
            // ssrc 一致
            if (mediaServerItem.isRtpEnable()) {
                // 多端口
                if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                    tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
                }
            }else {
                // 单端口
                if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                    logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
                }
            }
        }else {
            logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
            // ssrc 不一致
            if (mediaServerItem.isRtpEnable()) {
                // 多端口
                if (device.isSsrcCheck()) {
                    // ssrc检验
                    // 更新ssrc
                    logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
                    if (!result) {
                        try {
                            logger.warn("[Invite 200OK] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId);
                            cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
                        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                            logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
                        }
                        dynamicTask.stop(timeOutTaskKey);
                        // 释放ssrc
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                        callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                "下级自定义了ssrc,重新设置收流信息失败", null);
                        inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null,
                                InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                "下级自定义了ssrc,重新设置收流信息失败", null);
                    }else {
                        ssrcInfo.setSsrc(ssrcInResponse);
                        inviteInfo.setSsrcInfo(ssrcInfo);
                        inviteInfo.setStream(ssrcInfo.getStream());
                        if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                            if (mediaServerItem.isRtpEnable()) {
                                tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback);
                            }else {
                                logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
                            }
                        }
                        inviteStreamService.updateInviteInfo(inviteInfo);
                    }
                }
            }else {
                if (ssrcInResponse != null) {
                    // 单端口
                    // 重新订阅流上线
                    SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), null, inviteInfo.getStream());
                    streamSession.remove(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), inviteInfo.getStream());
                    inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
                    streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(),
                            inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
                }
            }
        }
    }
    @Override
    public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
    public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
        Device device = storager.queryVideoDevice(deviceId);
        if (device == null) {
            return;
        }
        MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
        DeviceChannel channel = channelService.getOne(deviceId, channelId);
        if (channel == null) {
            return;
        }
        MediaServer newMediaServerItem = this.getNewMediaServerItem(device);
        if (newMediaServerItem == null) {
            callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),
                    InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getMsg(),
                    null);
            return;
        }
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,false, device.getStreamModeForParam());
        // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(),  true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam());
        download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
    }
    @Override
    public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
    public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
        if (mediaServerItem == null || ssrcInfo == null) {
            callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
                    InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@@ -934,7 +979,7 @@
        }
        logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
        // 初始化redis中的invite消息状态
        InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
        InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
                mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
                InviteSessionStatus.ready);
        inviteStreamService.updateInviteInfo(inviteInfo);
@@ -964,10 +1009,10 @@
            streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
            inviteStreamService.removeInviteInfo(inviteInfo);
        };
        ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
            logger.info("[录像下载]收到订阅消息: " + jsonObject);
        HookSubscribe.Event hookEvent = (hookData) -> {
            logger.info("[录像下载]收到订阅消息: " + hookData);
            dynamicTask.stop(downLoadTimeOutTaskKey);
            StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
            StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime);
            if (streamInfo == null) {
                logger.warn("[录像下载] 获取流地址信息失败");
                callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -980,109 +1025,27 @@
        try {
            cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
                    hookEvent, errorEvent, eventResult ->{
                        inviteInfo.setStatus(InviteSessionStatus.ok);
                        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
                        String contentString = new String(responseEvent.getResponse().getRawContent());
                        // 获取ssrc
                        int ssrcIndex = contentString.indexOf("y=");
                        // 检查是否有y字段
                        if (ssrcIndex >= 0) {
                            //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
                            String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
                            // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
                            if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
                                if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
                                    String substring = contentString.substring(0, contentString.indexOf("y="));
                                    try {
                                        SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
                                        int port = -1;
                                        Vector mediaDescriptions = sdp.getMediaDescriptions(true);
                                        for (Object description : mediaDescriptions) {
                                            MediaDescription mediaDescription = (MediaDescription) description;
                                            Media media = mediaDescription.getMedia();
                        // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
                        InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
                                downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);
                                            Vector mediaFormats = media.getMediaFormats(false);
                                            if (mediaFormats.contains("96")) {
                                                port = media.getMediaPort();
                                                break;
                                            }
                                        }
                                        logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
                                        JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
                                        logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject);
                                    } catch (SdpException e) {
                                        logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
                                        dynamicTask.stop(downLoadTimeOutTaskKey);
                                        mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                                        // 释放ssrc
                                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                        callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                                        inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                                                InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
                                    }
                                }
                                return;
                            }
                            logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
                            if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
                                logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                                if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
                                    // ssrc 不可用
                                    // 释放ssrc
                                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                    callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
                                            InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
                                    return;
                                }
                                // 单端口模式streamId也有变化,需要重新设置监听
                                if (!mediaServerItem.isRtpEnable()) {
                                    // 添加订阅
                                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
                                    subscribe.removeSubscribe(hookSubscribe);
                                    hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
                                    subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
                                        logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                        dynamicTask.stop(downLoadTimeOutTaskKey);
                                        hookEvent.response(mediaServerItemInUse, response);
                                    });
                                }
                                // 更新ssrc
                                Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
                                if (!result) {
                                    try {
                                        logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId);
                                        cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null);
                                    } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                                        logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                                    }
                                    dynamicTask.stop(downLoadTimeOutTaskKey);
                                    // 释放ssrc
                                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                                    streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
                                    callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                            "下级自定义了ssrc,重新设置收流信息失败", null);
                                }else {
                                    ssrcInfo.setSsrc(ssrcInResponse);
                                    inviteInfo.setSsrcInfo(ssrcInfo);
                                    inviteInfo.setStream(ssrcInfo.getStream());
                                }
                            }else {
                                logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
                            }
                        }
                        // 注册录像回调事件,录像下载结束后写入下载地址
                        HookSubscribe.Event hookEventForRecord = (hookData) -> {
                            logger.info("[录像下载] 收到录像写入磁盘消息: , {}/{}-{}",
                                    inviteInfo.getDeviceId(), inviteInfo.getChannelId(), ssrcInfo.getStream());
                            logger.info("[录像下载] 收到录像写入磁盘消息内容: " + hookData);
                            RecordInfo recordInfo = hookData.getRecordInfo();
                            String filePath = recordInfo.getFilePath();
                            DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
                            InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId()
                                    , inviteInfo.getChannelId(), inviteInfo.getStream());
                            inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
                            inviteStreamService.updateInviteInfo(inviteInfoForNew);
                        };
                        Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
                        // 设置过期时间,下载失败时自动处理订阅数据
                        hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
                        subscribe.addSubscribe(hook, hookEventForRecord);
                    });
        } catch (InvalidArgumentException | SipException | ParseException e) {
            logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
@@ -1098,51 +1061,58 @@
    @Override
    public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
        if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
            logger.warn("[获取下载进度] 未查询到录像下载的信息");
            return null;
        }
        if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
            if (inviteInfo.getStreamInfo().getProgress() == 1) {
                return inviteInfo.getStreamInfo();
            }
            // 获取当前已下载时长
            String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
            MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
            if (mediaServerItem == null) {
                logger.warn("查询录像信息时发现节点已离线");
                return null;
            }
            if (mediaServerItem.getRecordAssistPort() > 0) {
                JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
                if (jsonObject == null) {
                    throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
                }
                if (jsonObject.getInteger("code") == 0) {
                    long duration = jsonObject.getLong("data");
                    if (duration == 0) {
                        inviteInfo.getStreamInfo().setProgress(0);
                    } else {
                        String startTime = inviteInfo.getStreamInfo().getStartTime();
                        String endTime = inviteInfo.getStreamInfo().getEndTime();
                        long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
                        long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
                        BigDecimal currentCount = new BigDecimal(duration / 1000);
                        BigDecimal totalCount = new BigDecimal(end - start);
                        BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
                        double process = divide.doubleValue();
                        inviteInfo.getStreamInfo().setProgress(process);
                    }
                    inviteStreamService.updateInviteInfo(inviteInfo);
                }
            }
        if (inviteInfo.getStreamInfo().getProgress() == 1) {
            return inviteInfo.getStreamInfo();
        }
        return null;
        // 获取当前已下载时长
        String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId();
        MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
        if (mediaServerItem == null) {
            logger.warn("[获取下载进度] 查询录像信息时发现节点不存在");
            return null;
        }
        SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
        if (ssrcTransaction == null) {
            logger.warn("[获取下载进度] 下载已结束");
            return null;
        }
        String app = "rtp";
        MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, app, stream);
        if (mediaInfo == null) {
            logger.warn("[获取下载进度] 查询进度失败, 节点Id: {}, {}/{}", mediaServerId, app, stream);
            return null;
        }
        if (mediaInfo.getDuration() == 0) {
            inviteInfo.getStreamInfo().setProgress(0);
        } else {
            String startTime = inviteInfo.getStreamInfo().getStartTime();
            String endTime = inviteInfo.getStreamInfo().getEndTime();
            // 此时start和end单位是秒
            long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
            long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
            BigDecimal currentCount = new BigDecimal(mediaInfo.getDuration());
            BigDecimal totalCount = new BigDecimal((end - start) * 1000);
            BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
            double process = divide.doubleValue();
            if (process > 0.999) {
                process = 1.0;
            }
            inviteInfo.getStreamInfo().setProgress(process);
        }
        inviteStreamService.updateInviteInfo(inviteInfo);
        return inviteInfo.getStreamInfo();
    }
    private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
    private StreamInfo onPublishHandlerForDownload(MediaServer mediaServerItemInuse, MediaInfo mediaInfo, String deviceId, String channelId, String startTime, String endTime) {
        StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, mediaInfo, deviceId, channelId);
        if (streamInfo != null) {
            streamInfo.setProgress(0);
            streamInfo.setStartTime(startTime);
@@ -1159,14 +1129,13 @@
    }
    public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
        String streamId = resonse.getString("stream");
        JSONArray tracks = resonse.getJSONArray("tracks");
        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
    public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId) {
        StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
        streamInfo.setDeviceID(deviceId);
        streamInfo.setChannelId(channelId);
        return streamInfo;
    }
    @Override
    public void zlmServerOffline(String mediaServerId) {
@@ -1217,7 +1186,7 @@
            logger.warn("开启语音广播的时候未找到通道: {}", channelId);
            return null;
        }
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        MediaServer mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
        if (broadcastMode == null) {
            broadcastMode = true;
        }
@@ -1226,13 +1195,13 @@
        AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
        audioBroadcastResult.setApp(app);
        audioBroadcastResult.setStream(stream);
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setStreamInfo(new StreamContent(mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
        audioBroadcastResult.setCodec("G.711");
        return audioBroadcastResult;
    }
    @Override
    public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
    public boolean audioBroadcastCmd(Device device, String channelId, MediaServer mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
        if (device == null || channelId == null) {
            return false;
        }
@@ -1248,7 +1217,7 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播已经开启: {}", channelId);
                    event.call("语音广播已经开启");
@@ -1258,24 +1227,21 @@
                }
            }
        }
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
        if (sendRtpItem != null) {
            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            if (streamReady) {
                logger.warn("[语音对讲] 进行中: {}", channelId);
                event.call("语音对讲进行中");
                return false;
            } else {
                stopTalk(device, channelId);
            }
        }
        // 发送通知
        cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
            // 发送成功
            AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform);
            audioBroadcastManager.update(audioBroadcastCatch);
            // 等待invite消息, 超时则结束
            String key = VideoManagerConstants.BROADCAST_WAITE_INVITE +  device.getDeviceId();
            if (!SipUtils.isFrontEnd(device.getDeviceId())) {
                key += audioBroadcastCatch.getChannelId();
            }
            dynamicTask.startDelay(key, ()->{
                logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
                stopAudioBroadcast(device.getDeviceId(), channelId);
            }, 10*1000);
        }, eventResultForError -> {
            // 发送失败
            logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
@@ -1291,8 +1257,8 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("语音广播通道使用中: {}", channelId);
                    return true;
@@ -1321,14 +1287,10 @@
                SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
                if (sendRtpItem != null) {
                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    Map<String, Object> param = new HashMap<>();
                    param.put("vhost", "__defaultVhost__");
                    param.put("app", sendRtpItem.getApp());
                    param.put("stream", sendRtpItem.getStream());
                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
                    MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
                    try {
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                        cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                    } catch (InvalidArgumentException | ParseException | SipException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[消息发送失败] 发送语音喊话BYE失败");
@@ -1402,14 +1364,19 @@
        }
        inviteInfo.getStreamInfo().setPause(true);
        inviteStreamService.updateInviteInfo(inviteInfo);
        MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        if (null == mediaServerItem) {
            logger.warn("mediaServer 不存在!");
            throw new ServiceException("mediaServer不存在");
        }
        // zlm 暂停RTP超时检查
        JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId);
        if (jsonObject == null || jsonObject.getInteger("code") != 0) {
        // 使用zlm中的流ID
        String streamKey = inviteInfo.getStream();
        if (!mediaServerItem.isRtpEnable()) {
            streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase();
        }
        Boolean result = mediaServerService.pauseRtpCheck(mediaServerItem, streamKey);
        if (!result) {
            throw new ServiceException("暂停RTP接收失败");
        }
        Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
@@ -1425,14 +1392,19 @@
        }
        inviteInfo.getStreamInfo().setPause(false);
        inviteStreamService.updateInviteInfo(inviteInfo);
        MediaServerItem mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        if (null == mediaServerItem) {
            logger.warn("mediaServer 不存在!");
            throw new ServiceException("mediaServer不存在");
        }
        // zlm 暂停RTP超时检查
        JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId);
        if (jsonObject == null || jsonObject.getInteger("code") != 0) {
        // 使用zlm中的流ID
        String streamKey = inviteInfo.getStream();
        if (!mediaServerItem.isRtpEnable()) {
            streamKey = Long.toHexString(Long.parseLong(inviteInfo.getSsrcInfo().getSsrc())).toUpperCase();
        }
        boolean result = mediaServerService.resumeRtpCheck(mediaServerItem, streamKey);
        if (!result) {
            throw new ServiceException("继续RTP接收失败");
        }
        Device device = storager.queryVideoDevice(inviteInfo.getDeviceId());
@@ -1441,102 +1413,61 @@
    @Override
    public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
        // 开始发流
        // 取消设置的超时任务
//         String channelId = request.getCallIdHeader().getCallId();
        String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        logger.info("收到ACK,rtp/{}开始推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost", "__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", is_Udp);
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
        }
        MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaInfo == null) {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
            });
        } else {
            // 如果是非严格模式,需要关闭端口占用
            JSONObject startSendRtpStreamResult = null;
            if (sendRtpItem.getLocalPort() != 0) {
                HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
                hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
                if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
                    if (sendRtpItem.isTcpActive()) {
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                    } else {
                        param.put("dst_url", sendRtpItem.getIp());
                        param.put("dst_port", sendRtpItem.getPort());
                        startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                    }
                }
            } else {
            try {
                if (sendRtpItem.isTcpActive()) {
                    startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                } else {
                    param.put("dst_url", sendRtpItem.getIp());
                    param.put("dst_port", sendRtpItem.getPort());
                    startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                }
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
                return;
            }
            if (startSendRtpStreamResult != null) {
                startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
            }
            logger.info("RTP推流成功[ {}/{} ],{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
        }
    }
    @Override
    public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
                                       JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("RTP推流失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
            logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
        } else {
            logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
            if (sendRtpItem.isOnlyAudio()) {
                Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
                AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
                if (audioBroadcastCatch != null) {
                    try {
                        cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                    } catch (SipException | ParseException | InvalidArgumentException |
                             SsrcTransactionNotFoundException e) {
                        logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
                    }
    public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, ParentPlatform platform, CallIdHeader callIdHeader) {
        if (sendRtpItem.isOnlyAudio()) {
            Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
            AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
            if (audioBroadcastCatch != null) {
                try {
                    cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
                } catch (SipException | ParseException | InvalidArgumentException |
                         SsrcTransactionNotFoundException exception) {
                    logger.error("[命令发送失败] 停止语音对讲: {}", exception.getMessage());
                }
            } else {
            }
        } else {
            if (platform != null) {
                // 向上级平台
                try {
                    commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
                    commanderForPlatform.streamByeCmd(platform, callIdHeader.getCallId());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                }
            }
        }
    }
    @Override
    public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event) {
    public void talkCmd(Device device, String channelId, MediaServer mediaServerItem, String stream, AudioBroadcastEvent event) {
        if (device == null || channelId == null) {
            return;
        }
@@ -1553,8 +1484,8 @@
            SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
            if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
                // 查询流是否存在,不存在则认为是异常状态
                MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
                if (streamReady) {
                    logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
                    event.call("正在语音广播");
@@ -1567,8 +1498,8 @@
        SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
        if (sendRtpItem != null) {
            MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
            Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
            if (streamReady) {
                logger.warn("[语音对讲] 进行中: {}", channelId);
                event.call("语音对讲进行中");
@@ -1578,7 +1509,7 @@
            }
        }
        talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
        talk(mediaServerItem, device, channelId, stream, (hookData) -> {
            logger.info("[语音对讲] 收到设备发来的流");
        }, eventResult -> {
            logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
@@ -1612,15 +1543,10 @@
            return;
        }
        MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
        MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
        if (streamIsReady == null || streamIsReady) {
            Map<String, Object> param = new HashMap<>();
            param.put("vhost", "__defaultVhost__");
            param.put("app", sendRtpItem.getApp());
            param.put("stream", sendRtpItem.getStream());
            param.put("ssrc", sendRtpItem.getSsrc());
            zlmrtpServerFactory.stopSendRtpStream(mediaServer, param);
            mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
        }
        ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc());
@@ -1635,4 +1561,74 @@
        }
        redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
    }
    @Override
    public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
        Device device = deviceService.getDevice(deviceId);
        if (device == null) {
            errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
            return;
        }
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
        if (inviteInfo != null) {
            if (inviteInfo.getStreamInfo() != null) {
                // 已存在线直接截图
                MediaServer mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
                String streamUrl;
                if (mediaServerItemInuse.getRtspPort() != 0) {
                    streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp",  inviteInfo.getStreamInfo().getStream());
                }else {
                    streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp",  inviteInfo.getStreamInfo().getStream());
                }
                String path = "snap";
                // 请求截图
                logger.info("[请求截图]: " + fileName);
                mediaServerService.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
                File snapFile = new File(path + File.separator + fileName);
                if (snapFile.exists()) {
                    errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
                }else {
                    errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
                }
                return;
            }
        }
        MediaServer newMediaServerItem = getNewMediaServerItem(device);
        play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{
            if (code == InviteErrorCode.SUCCESS.getCode()) {
                InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
                if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
                    getSnap(deviceId, channelId, fileName, errorCallback);
                }else {
                    errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
                }
            }else {
                errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
            }
        });
    }
    @Override
    public void stopPlay(Device device, String channelId) {
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        if (inviteInfo == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
        }
        if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
            try {
                logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
                cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
            } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
            }
        }
        inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
        storager.stopPlay(device.getDeviceId(), channelId);
        channelService.stopPlay(device.getDeviceId(), channelId);
        if (inviteInfo.getStreamInfo() != null) {
            mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
        }
    }
}