648540858
2024-04-15 f9abfca003bc9515f1f6028657fa6347326a1402
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -18,6 +18,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -28,7 +29,6 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -127,12 +127,11 @@
    @Autowired
    private SipConfig config;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Override
@@ -577,21 +576,40 @@
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    SendRtpItem sendRtpItem = new SendRtpItem();
                    sendRtpItem.setTcpActive(tcpActive);
                    sendRtpItem.setTcp(mediaTransmissionTCP);
                    sendRtpItem.setRtcp(platform.isRtcp());
                    sendRtpItem.setSsrc(ssrc);
                    sendRtpItem.setPlatformName(platform.getName());
                    sendRtpItem.setPlatformId(platform.getServerGBId());
                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
                    sendRtpItem.setChannelId(channelId);
                    sendRtpItem.setIp(addressStr);
                    sendRtpItem.setPort(port);
                    sendRtpItem.setUsePs(true);
                    sendRtpItem.setApp(gbStream.getApp());
                    sendRtpItem.setStream(gbStream.getStream());
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    sendRtpItem.setStatus(0);
                    if ("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            sendRtpItem.setServerId(pushListItem.getSeverId());
                            if (pushListItem != null) {
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                pushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                // 未推流 拉起
                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
@@ -601,8 +619,7 @@
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            } else {
                                //开启代理拉流
                                notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
@@ -659,8 +676,9 @@
            sendRtpItem.setStatus(1);
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
            SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
@@ -670,19 +688,14 @@
    }
    private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 推流
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
@@ -691,16 +704,11 @@
                    }
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                sendRtpItem.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
@@ -708,25 +716,22 @@
            } else {
                // 不在线 拉起
                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
            }
        } else {
            SendRtpItem sendRtpItem = new SendRtpItem();
            sendRtpItem.setRtcp(platform.isRtcp());
            sendRtpItem.setTcp(mediaTransmissionTCP);
            sendRtpItem.setTcpActive();
            // 其他平台内容
            otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            otherWvpPushStream(sendRtpItem, request, platform);
        }
    }
    /**
     * 通知流上线
     */
    private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        if ("proxy".equals(gbStream.getStreamType())) {
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            // 监听流上线
@@ -752,7 +757,12 @@
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                dynamicTask.stop(callIdHeader.getCallId());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
    }
    /**
     * 通知流上线
     */
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
                try {
@@ -768,7 +778,7 @@
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                    platform.getName(), null, gbStream.getMediaServerId());
                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
            // 设置超时
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
@@ -781,19 +791,26 @@
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
            // 添加监听
            int finalPort = port;
            Boolean finalTcpActive = tcpActive;
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
        // 写入redis待发流信息,供其他wvp读取并生成发流信息
        SendRtpItem sendRtpItemTemp = new SendRtpItem();
        sendRtpItemTemp.setIp(addressStr);
        sendRtpItemTemp.setPort(port);
        sendRtpItemTemp.setSsrc(ssrc);
        sendRtpItemTemp.setPlatformId(requesterId);
        sendRtpItemTemp.setPlatformName(platform.getName());
        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
        sendRtpItemTemp.setRtcp(platform.isRtcp());
        sendRtpItemTemp.setTcpActive(tcpActive);
        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                    if (sendRtpItem == null) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
@@ -806,24 +823,21 @@
                        }
                        return;
                    }
                    if (finalTcpActive != null) {
                        sendRtpItem.setTcpActive(finalTcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                sendRtpItemTemp.setLocalPort(localPort);
                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
                    // 写入redis, 超时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                sendRtpItemTemp.setStatus(1);
                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                sendRtpItemTemp.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    sendRtpItemTemp.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
                } else {
                    // 其他平台内容
                    otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
                }
            });
@@ -840,78 +854,30 @@
                }
            });
        }
    }
    /**
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
                    // 收到sendItem
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
        redisCatchStorage.sendStartSendRtp(sendRtpItem);
                    // 写入redis, 超时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                }, (wvpResult) -> {
                    // 错误
                    if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                        // 离线
                        // 查询是否在本机上线了
                        StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                        if (currentStreamPushItem.isPushIng()) {
                            // 在线状态
                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 不在线 拉起
                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
                    }
                });
    }
    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
    public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
        String sdpIp = mediaServerItem.getSdpIp();
        String sdpIp = sendRtpItem.getLocalIp();
        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
            sdpIp = platform.getSendStreamIp();
        }