648540858
2024-04-09 b1b6fae22c5ab3013d73df1a43cd42f7fe0fa347
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
old mode 100644 new mode 100755
@@ -1,28 +1,32 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -34,8 +38,11 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -50,6 +57,8 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -80,6 +89,9 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
@@ -104,7 +116,7 @@
    private IMediaServerService mediaServerService;
    @Autowired
    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
    private HookSubscribe hookSubscribe;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
@@ -122,6 +134,9 @@
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -138,12 +153,25 @@
    public void process(RequestEvent evt) {
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        try {
            SIPRequest request = (SIPRequest) evt.getRequest();
            String channelId = SipUtils.getChannelIdFromRequest(request);
            SIPRequest request = (SIPRequest)evt.getRequest();
            String channelIdFromSub = SipUtils.getChannelIdFromRequest(request);
            // 解析sdp消息, 使用jainsip 自带的sdp解析方式
            String contentString = new String(request.getRawContent());
            Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
            SessionDescription sdp = gb28181Sdp.getBaseSdb();
            String sessionName = sdp.getSessionName().getValue();
            String channelIdFromSdp = null;
            if(StringUtils.equalsIgnoreCase("Playback", sessionName)){
                URIField uriField = (URIField)sdp.getURI();
                channelIdFromSdp = uriField.getURI().split(":")[0];
            }
            final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub;
            String requesterId = SipUtils.getUserIdFromFromHeader(request);
            CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                logger.info("无法从请求中获取到平台id,返回400");
                // 参数不全, 发400,请求错误
                try {
                    responseAck(request, Response.BAD_REQUEST);
@@ -153,6 +181,8 @@
                return;
            }
            logger.info("[INVITE] requesterId: {}, callId: {}, 来自:{}:{}",
                    requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort());
            // 查询请求是否来自上级平台\设备
            ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
@@ -165,7 +195,7 @@
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
                PlatformCatalog catalog = storager.getCatalog(requesterId, channelId);
                MediaServerItem mediaServerItem = null;
                MediaServer mediaServerItem = null;
                StreamPushItem streamPushItem = null;
                StreamProxyItem proxyByAppAndStream = null;
                // 不是通道可能是直播流
@@ -246,12 +276,6 @@
                    }
                    return;
                }
                // 解析sdp消息, 使用jainsip 自带的sdp解析方式
                String contentString = new String(request.getRawContent());
                Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
                SessionDescription sdp = gb28181Sdp.getBaseSdb();
                String sessionName = sdp.getSessionName().getValue();
                Long startTime = null;
                Long stopTime = null;
@@ -377,19 +401,22 @@
                    Long finalStopTime = stopTime;
                    ErrorCallback<Object> hookEvent = (code, msg, data) -> {
                        StreamInfo streamInfo = (StreamInfo)data;
                        MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
                        MediaServer mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
                        logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream());
                        //     * 0 等待设备推流上来
                        //     * 1 下级已经推流,等待上级平台回复ack
                        //     * 2 推流中
                        sendRtpItem.setStatus(1);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        String sdpIp = mediaServerItemInUSe.getSdpIp();
                        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
                            sdpIp = platform.getSendStreamIp();
                        }
                        StringBuffer content = new StringBuffer(200);
                        content.append("v=0\r\n");
                        content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
                        content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n");
                        content.append("s=" + sessionName + "\r\n");
                        content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
                        content.append("c=IN IP4 " + sdpIp + "\r\n");
                        if ("Playback".equalsIgnoreCase(sessionName)) {
                            content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
                        } else {
@@ -400,11 +427,21 @@
                            // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
                            localPort = new Random().nextInt(65535) + 1;
                        }
                        content.append("m=video " + localPort + " RTP/AVP 96\r\n");
                        if (sendRtpItem.isTcp()) {
                            content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n");
                            if (!sendRtpItem.isTcpActive()) {
                                content.append("a=setup:active\r\n");
                            } else {
                                content.append("a=setup:passive\r\n");
                            }
                        }else {
                            content.append("m=video " + localPort + " RTP/AVP 96\r\n");
                        }
                        content.append("a=sendonly\r\n");
                        content.append("a=rtpmap:96 PS/90000\r\n");
                        content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
                        content.append("f=\r\n");
                        try {
                            // 超时未收到Ack应该回复bye,当前等待时间为10秒
@@ -418,8 +455,34 @@
                                    logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
                                }
                            }, 60 * 1000);
                             responseSdpAck(request, content.toString(), platform);
                            responseSdpAck(request, content.toString(), platform);
                            // tcp主动模式,回复sdp后开启监听
                            if (sendRtpItem.isTcpActive()) {
                                MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                Map<String, Object> param = new HashMap<>(12);
                                param.put("vhost","__defaultVhost__");
                                param.put("app",sendRtpItem.getApp());
                                param.put("stream",sendRtpItem.getStream());
                                param.put("ssrc", sendRtpItem.getSsrc());
                                if (!sendRtpItem.isTcpActive()) {
                                    param.put("dst_url",sendRtpItem.getIp());
                                    param.put("dst_port", sendRtpItem.getPort());
                                }
                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
                                param.put("is_udp", is_Udp);
                                param.put("src_port", localPort);
                                param.put("pt", sendRtpItem.getPt());
                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
                                if (!sendRtpItem.isTcp()) {
                                    // 开启rtcp保活
                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
                                }
                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
                                if (startSendRtpStreamResult != null) {
                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
                                }
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                        }
@@ -438,8 +501,11 @@
                    sendRtpItem.setApp("rtp");
                    if ("Playback".equalsIgnoreCase(sessionName)) {
                        sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
                        sendRtpItem.setStream(ssrcInfo.getStream());
                        String startTimeStr = DateUtil.urlFormatter.format(start);
                        String endTimeStr = DateUtil.urlFormatter.format(end);
                        String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, device.getStreamModeForParam());
                        sendRtpItem.setStream(stream);
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
@@ -468,7 +534,7 @@
                        }
                        sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam());
                        sendRtpItem.setStream(ssrcInfo.getStream());
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -485,10 +551,13 @@
                                        errorEvent.run(code, msg, data);
                                    }
                                });
                    }else {
                    } else {
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        sendRtpItem.setStream(streamId);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
                            if (code == InviteErrorCode.SUCCESS.getCode()){
                            if (code == InviteErrorCode.SUCCESS.getCode()) {
                                hookEvent.run(code, msg, data);
                            } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
@@ -498,14 +567,6 @@
                                errorEvent.run(code, msg, data);
                            }
                        }));
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = null;
                        if (mediaServerItem.isRtpEnable()) {
                            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        }else {
                            streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase();
                        }
                        sendRtpItem.setStream(streamId);
                        sendRtpItem.setSsrc(ssrcInfo.getSsrc());
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -521,14 +582,20 @@
                    }
                    if ("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null && streamPushItem.isPushIng()) {
                            // 推流状态
                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 未推流 拉起
                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }else {
                                // 未推流 拉起
                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
                        if (null != proxyByAppAndStream) {
@@ -553,11 +620,23 @@
        }
    }
    private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
                                        JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("下级TCP被动启动监听失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
            logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
        } else {
            logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
        }
    }
    /**
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
@@ -595,7 +674,7 @@
    }
    private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            CallIdHeader callIdHeader, MediaServer mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
        // 推流
@@ -647,24 +726,23 @@
     * 通知流上线
     */
    private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            // 监听流上线
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
            zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
                OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
            Hook hook = Hook.getInstance(HookType.on_media_arrival, gbStream.getApp(), gbStream.getStream(), mediaServerItem.getId());
            this.hookSubscribe.addSubscribe(hook, (hookData) -> {
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", hookData.getApp(), hookData.getStream());
                dynamicTask.stop(callIdHeader.getCallId());
                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            });
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                this.hookSubscribe.removeSubscribe(hook);
            }, userSetting.getPlatformPlayTimeout());
            boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
            if (!start) {
@@ -673,11 +751,9 @@
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                }
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                this.hookSubscribe.removeSubscribe(hook);
                dynamicTask.stop(callIdHeader.getCallId());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
@@ -700,13 +776,10 @@
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                try {
                    redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException e) {
                    logger.error("未处理的异常 ", e);
                } catch (InvalidArgumentException e) {
                    logger.error("未处理的异常 ", e);
                } catch (ParseException e) {
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
@@ -717,6 +790,7 @@
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
@@ -774,14 +848,14 @@
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    CallIdHeader callIdHeader, MediaServer mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> {
                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
@@ -837,13 +911,17 @@
                });
    }
    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
    public SIPResponse sendStreamAck(MediaServer mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
        String sdpIp = mediaServerItem.getSdpIp();
        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
            sdpIp = platform.getSendStreamIp();
        }
        StringBuffer content = new StringBuffer(200);
        content.append("v=0\r\n");
        content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
        content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
        content.append("s=Play\r\n");
        content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
        content.append("c=IN IP4 " + sdpIp + "\r\n");
        content.append("t=0 0\r\n");
        // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
        int localPort = sendRtpItem.getLocalPort();
@@ -916,7 +994,10 @@
        }
        if (device != null) {
            logger.info("收到设备" + requesterId + "的语音广播Invite请求");
            String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId();
            String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId();
            if (!SipUtils.isFrontEnd(device.getDeviceId())) {
                key += broadcastCatch.getChannelId();
            }
            dynamicTask.stop(key);
            try {
                responseAck(request, Response.TRYING);
@@ -926,8 +1007,6 @@
                return;
            }
            String contentString = new String(request.getRawContent());
            // jainSip不支持y=字段, 移除移除以解析。
            String ssrc = "0000000404";
            try {
                Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
@@ -944,7 +1023,7 @@
                    Media media = mediaDescription.getMedia();
                    Vector mediaFormats = media.getMediaFormats(false);
                    if (mediaFormats.contains("8")) {
//                    if (mediaFormats.contains("8")) {
                        port = media.getMediaPort();
                        String protocol = media.getProtocol();
                        // 区分TCP发流还是udp, 当前默认udp
@@ -960,7 +1039,7 @@
                            }
                        }
                        break;
                    }
//                    }
                }
                if (port == -1) {
                    logger.info("不支持的媒体格式,返回415");
@@ -975,10 +1054,10 @@
                    return;
                }
                String addressStr = sdp.getOrigin().getAddress();
                logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc,
                logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(),
                        mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP");
                MediaServerItem mediaServerItem = broadcastCatch.getMediaServerItem();
                MediaServer mediaServerItem = broadcastCatch.getMediaServerItem();
                if (mediaServerItem == null) {
                    logger.warn("未找到语音喊话使用的zlm");
                    try {
@@ -989,11 +1068,11 @@
                    }
                    return;
                }
                logger.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
                logger.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, gb28181Sdp.getSsrc(),
                        mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
                CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
                        device.getDeviceId(), broadcastCatch.getChannelId(),
                        mediaTransmissionTCP, false);
@@ -1029,7 +1108,7 @@
                Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
                if (streamReady) {
                    sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
                    sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, gb28181Sdp.getSsrc());
                } else {
                    logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream());
                    try {
@@ -1055,7 +1134,7 @@
        }
    }
    SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {
    SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServer mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {
        SIPResponse sipResponse = null;
        try {
            sendRtpItem.setStatus(2);
@@ -1099,9 +1178,10 @@
            audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
            audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
            audioBroadcastManager.update(audioBroadcastCatch);
            streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
            // 开启发流,大华在收到200OK后就会开始建立连接
            if (!device.isBroadcastPushAfterAck()) {
                logger.info("[语音喊话] 回复200OK后发现 BroadcastPushAfterAck为False,现在开始推流");
                playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader());
            }