648540858
2024-04-28 1fc2916c2b4b28fbf722c4401e559805f9578573
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,37 +1,56 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
/**
 * @author lin
 */
@Service
@DS("master")
public class PlatformServiceImpl implements IPlatformService {
    private final static String REGISTER_KEY_PREFIX = "platform_register_";
@@ -45,15 +64,6 @@
    private ParentPlatformMapper platformMapper;
    @Autowired
    private PlatformCatalogMapper catalogMapper;
    @Autowired
    private PlatformChannelMapper platformChannelMapper;
    @Autowired
    private PlatformGbStreamMapper platformGbStreamMapper;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
@@ -63,13 +73,10 @@
    private IMediaServerService mediaServerService;
    @Autowired
    private SIPCommanderFroPlatform commanderForPlatform;
    private ISIPCommanderForPlatform commanderForPlatform;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private SubscribeHolder subscribeHolder;
@@ -80,6 +87,65 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private IPlayService playService;
    @Autowired
    private IInviteStreamService inviteStreamService;
    /**
     * 流离开的处理
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaDepartureEvent event) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
        if (!sendRtpItems.isEmpty()) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
                    String platformId = sendRtpItem.getPlatformId();
                    ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
                    try {
                        if (platform != null) {
                            commanderForPlatform.streamByeCmd(platform, sendRtpItem);
                            redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
                                    sendRtpItem.getCallId(), sendRtpItem.getStream());
                        }
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
                    }
                }
            }
        }
    }
    /**
     * 发流停止
     */
    @Async("taskExecutor")
    @EventListener
    public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
        List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream());
        if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId());
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                try {
                    commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                }
                redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
                        sendRtpItem.getCallId(), sendRtpItem.getStream());
            }
        }
    }
    @Override
@@ -143,7 +209,7 @@
        dynamicTask.stop(registerTaskKey);
        // 注销旧的
        try {
            if (parentPlatformOld.isStatus()) {
            if (parentPlatformOld.isStatus() && parentPlatformCatchOld != null) {
                logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
                commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
                    logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
@@ -181,6 +247,7 @@
                logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
            }
        }
        return false;
    }
@@ -226,18 +293,19 @@
                        try {
                            commanderForPlatform.keepalive(parentPlatform, eventResult -> {
                                // 心跳失败
                                if (eventResult.type == SipSubscribe.EventResultType.timeout) {
                                    // 心跳超时
                                    ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                    // 此时是第三次心跳超时, 平台离线
                                    if (platformCatch.getKeepAliveReply()  == 2) {
                                        // 设置平台离线,并重新注册
                                        logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
                                        offline(parentPlatform, false);
                                    }
                                }else {
                                if (eventResult.type != SipSubscribe.EventResultType.timeout) {
                                    logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg);
                                }
                                // 心跳失败
                                ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
                                // 此时是第三次心跳超时, 平台离线
                                if (platformCatch.getKeepAliveReply()  == 2) {
                                    // 设置平台离线,并重新注册
                                    logger.info("[国标级联] 三次心跳失败, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
                                    offline(parentPlatform, false);
                                }else {
                                    platformCatch.setKeepAliveReply(platformCatch.getKeepAliveReply() + 1);
                                    redisCatchStorage.updatePlatformCatchInfo(platformCatch);
                                }
                            }, eventResult -> {
@@ -256,6 +324,32 @@
                    },
                    (parentPlatform.getKeepTimeout())*1000);
        }
        if (parentPlatform.isAutoPushChannel()) {
            if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) {
                logger.info("[国标级联]:{}, 添加自动通道推送模拟订阅信息", parentPlatform.getServerGBId());
                addSimulatedSubscribeInfo(parentPlatform);
            }
        }else {
            SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
            if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) {
                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
            }
        }
    }
    @Override
    public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) {
        // 自动添加一条模拟的订阅信息
        SubscribeInfo subscribeInfo = new SubscribeInfo();
        subscribeInfo.setId(parentPlatform.getServerGBId());
        subscribeInfo.setExpires(-1);
        subscribeInfo.setEventType("Catalog");
        int random = (int) Math.floor(Math.random() * 10000);
        subscribeInfo.setEventId(random + "");
        subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP());
        subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", ""));
        subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", ""));
        subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo);
    }
    private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
@@ -310,9 +404,16 @@
            // 清除心跳任务
            dynamicTask.stop(keepaliveTaskKey);
        }
        // 停止目录订阅回复
        logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId());
        subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
        // 停止订阅回复
        SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId());
        if (catalogSubscribe != null) {
            if (catalogSubscribe.getExpires() > 0) {
                logger.info("[平台离线] {}, 停止目录订阅回复", parentPlatform.getServerGBId());
                subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId());
            }
        }
        logger.info("[平台离线] {}, 停止移动位置订阅回复", parentPlatform.getServerGBId());
        subscribeHolder.removeMobilePositionSubscribe(parentPlatform.getServerGBId());
        // 发起定时自动重新注册
        if (!stopRegister) {
            // 设置为60秒自动尝试重新注册
@@ -323,7 +424,6 @@
                        ()-> registerTask(platform, null),
                        userSetting.getRegisterAgainAfterTime() * 1000);
            }
        }
    }
@@ -333,12 +433,8 @@
            for (SendRtpItem sendRtpItem : sendRtpItems) {
                ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
                redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                Map<String, Object> param = new HashMap<>(3);
                param.put("vhost", "__defaultVhost__");
                param.put("app", sendRtpItem.getApp());
                param.put("stream", sendRtpItem.getStreamId());
                zlmServerFactory.stopSendRtpStream(mediaInfo, param);
                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
            }
        }
    }
@@ -394,4 +490,318 @@
            }
        }
    }
    @Override
    public void broadcastInvite(ParentPlatform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent,
                                SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException {
        if (mediaServerItem == null) {
            logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId());
            return;
        }
        InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId);
        if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) {
            // 如果zlm不存在这个流,则删除数据即可
            MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId());
            if (mediaServerItemForStreamInfo != null) {
                Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream());
                if (!ready) {
                    // 错误存在于redis中的数据
                    inviteStreamService.removeInviteInfo(inviteInfoForOld);
                }else {
                    // 流确实尚在推流,直接回调结果
                    HookData hookData = new HookData();
                    hookData.setApp(inviteInfoForOld.getStreamInfo().getApp());
                    hookData.setStream(inviteInfoForOld.getStreamInfo().getStream());
                    hookData.setMediaServer(mediaServerItemForStreamInfo);
                    hookEvent.response(hookData);
                    return;
                }
            }
        }
        String streamId = null;
        if (mediaServerItem.isRtpEnable()) {
            streamId = String.format("%s_%s", platform.getServerGBId(), channelId);
        }
        // 默认不进行SSRC校验, TODO 后续可改为配置
        boolean ssrcCheck = false;
        int tcpMode;
        if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-PASSIVE")) {
            tcpMode = 1;
        }else if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
            tcpMode = 2;
        } else {
            tcpMode = 0;
        }
        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
        if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
            logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId);
            SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
            eventResult.statusCode = -1;
            eventResult.msg = "端口监听失败";
            eventResult.type = SipSubscribe.EventResultType.failedToGetPort;
            errorEvent.response(eventResult);
            return;
        }
        logger.info("[国标级联] 语音喊话,发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
                platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
        // 初始化redis中的invite消息状态
        InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo,
                mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
                InviteSessionStatus.ready);
        inviteStreamService.updateInviteInfo(inviteInfo);
        String timeOutTaskKey = UUID.randomUUID().toString();
        dynamicTask.startDelay(timeOutTaskKey, () -> {
            // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
            InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null);
            if (inviteInfoForBroadcast == null) {
                logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
                // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
                try {
                    commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
                } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
                    logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
                } finally {
                    timeoutCallback.run(1, "收流超时");
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                    streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
                }
            }
        }, userSetting.getPlayTimeout());
        commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{
            logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
            dynamicTask.stop(timeOutTaskKey);
            // hook响应
            playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId);
            // 收到流
            if (hookEvent != null) {
                hookEvent.response(hookData);
            }
        }, event -> {
            inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey,
                    null, inviteInfo, InviteSessionType.BROADCAST);
//            // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc
//            ResponseEvent responseEvent = (ResponseEvent) event.event;
//            String contentString = new String(responseEvent.getResponse().getRawContent());
//            // 获取ssrc
//            int ssrcIndex = contentString.indexOf("y=");
//            // 检查是否有y字段
//            if (ssrcIndex >= 0) {
//                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
//                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
//                // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
//                if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
//                    tcpActiveHandler(platform, )
//                    return;
//                }
//                logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
//                if (!mediaServerItem.isRtpEnable()) {
//                    logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
//                    // 释放ssrc
//                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
//                    // 单端口模式streamId也有变化,需要重新设置监听
//                    if (!mediaServerItem.isRtpEnable()) {
//                        // 添加订阅
//                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
//                        subscribe.removeSubscribe(hookSubscribe);
//                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
//                        subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
//                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
//                            dynamicTask.stop(timeOutTaskKey);
//                            // hook响应
//                            playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
//                            hookEvent.response(mediaServerItemInUse, hookParam);
//                        });
//                    }
//                    // 关闭rtp server
//                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
//                    // 重新开启ssrc server
//                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
//                }
//            }
        }, eventResult -> {
            // 收到错误回复
            if (errorEvent != null) {
                errorEvent.response(eventResult);
            }
        });
    }
    private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
                                 ParentPlatform platform, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
                                 InviteInfo inviteInfo, InviteSessionType inviteSessionType){
        inviteInfo.setStatus(InviteSessionStatus.ok);
        ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
        String contentString = new String(responseEvent.getResponse().getRawContent());
        System.out.println(contentString);
        String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
        // 兼容回复的消息中缺少ssrc(y字段)的情况
        if (ssrcInResponse == null) {
            ssrcInResponse = ssrcInfo.getSsrc();
        }
        if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
            // ssrc 一致
            if (mediaServerItem.isRtpEnable()) {
                // 多端口
                if (tcpMode == 2) {
                    tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
                            timeOutTaskKey, ssrcInfo, callback);
                }
            }else {
                // 单端口
                if (tcpMode == 2) {
                    logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
                }
            }
        }else {
            logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
            // ssrc 不一致
            if (mediaServerItem.isRtpEnable()) {
                // 多端口
                if (ssrcCheck) {
                    // ssrc检验
                    // 更新ssrc
                    logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
                    // 释放ssrc
                    mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                    Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
                    if (!result) {
                        try {
                            logger.warn("[Invite 200OK] 更新ssrc失败,停止喊话 {}/{}", platform.getServerGBId(), channelId);
                            commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null);
                        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
                            logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
                        }
                        dynamicTask.stop(timeOutTaskKey);
                        // 释放ssrc
                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
                        streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
                        callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                "下级自定义了ssrc,重新设置收流信息失败", null);
                        inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null,
                                InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
                                "下级自定义了ssrc,重新设置收流信息失败", null);
                    }else {
                        ssrcInfo.setSsrc(ssrcInResponse);
                        inviteInfo.setSsrcInfo(ssrcInfo);
                        inviteInfo.setStream(ssrcInfo.getStream());
                        if (tcpMode == 2) {
                            if (mediaServerItem.isRtpEnable()) {
                                tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
                                        timeOutTaskKey, ssrcInfo, callback);
                            }else {
                                logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
                            }
                        }
                        inviteStreamService.updateInviteInfo(inviteInfo);
                    }
                }else {
                    ssrcInfo.setSsrc(ssrcInResponse);
                    inviteInfo.setSsrcInfo(ssrcInfo);
                    inviteInfo.setStream(ssrcInfo.getStream());
                    if (tcpMode == 2) {
                        if (mediaServerItem.isRtpEnable()) {
                            tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck,
                                    timeOutTaskKey, ssrcInfo, callback);
                        }else {
                            logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
                        }
                    }
                    inviteStreamService.updateInviteInfo(inviteInfo);
                }
            }else {
                if (ssrcInResponse != null) {
                    // 单端口
                    // 重新订阅流上线
                    SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), null, inviteInfo.getStream());
                    streamSession.remove(inviteInfo.getDeviceId(),
                            inviteInfo.getChannelId(), inviteInfo.getStream());
                    inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
                    streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(),
                            inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType);
                }
            }
        }
    }
    private void tcpActiveHandler(ParentPlatform platform, String channelId, String contentString,
                                  MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
                                  String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
        if (tcpMode != 2) {
            return;
        }
        String substring;
        if (contentString.indexOf("y=") > 0) {
            substring = contentString.substring(0, contentString.indexOf("y="));
        }else {
            substring = contentString;
        }
        try {
            SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
            int port = -1;
            Vector mediaDescriptions = sdp.getMediaDescriptions(true);
            for (Object description : mediaDescriptions) {
                MediaDescription mediaDescription = (MediaDescription) description;
                Media media = mediaDescription.getMedia();
                Vector mediaFormats = media.getMediaFormats(false);
                if (mediaFormats.contains("8") || mediaFormats.contains("0")) {
                    port = media.getMediaPort();
                    break;
                }
            }
            logger.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验:{}",
                    platform.getServerGBId(), channelId, sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
            Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
            logger.info("[TCP主动连接对方] 结果: {}", result);
        } catch (SdpException e) {
            logger.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channelId, e);
            dynamicTask.stop(timeOutTaskKey);
            mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
            // 释放ssrc
            mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
            streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
            callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
            inviteStreamService.call(InviteSessionType.PLAY, platform.getServerGBId(), channelId, null,
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
                    InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
        }
    }
    @Override
    public void stopBroadcast(ParentPlatform platform, DeviceChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) {
        try {
            if (sendBye) {
                commanderForPlatform.streamByeCmd(platform, channel.getChannelId(), stream, null, null);
            }
        } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
            logger.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getChannelId() );
        } finally {
            mediaServerService.closeRTPServer(mediaServerItem, stream);
            InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, platform.getServerGBId(), channel.getChannelId(), stream);
            if (inviteInfo != null) {
                // 释放ssrc
                mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
                inviteStreamService.removeInviteInfo(inviteInfo);
            }
            streamSession.remove(platform.getServerGBId(), channel.getChannelId(), stream);
        }
    }
}