648540858
2023-08-04 988dc36fa56a47cc4f331ab48c07577805a71425
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
@@ -12,7 +14,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@@ -47,6 +49,8 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -91,7 +95,7 @@
    private SIPSender sipSender;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -153,7 +157,7 @@
                // 查询平台下是否有该通道
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
                PlatformCatalog catalog = storager.getCatalog(channelId);
                PlatformCatalog catalog = storager.getCatalog(requesterId, channelId);
                MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
@@ -181,16 +185,11 @@
                            return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                try {
                                    responseAck(request, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }else {
                                 // TODO 可能漏回复消息
                            if (streamPushItem != null) {
                                mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
                            }
                            if (mediaServerItem == null) {
                                mediaServerItem = mediaServerService.getDefaultMediaServer();
                            }
                        }
                    } else {
@@ -350,7 +349,7 @@
                        streamTypeStr = "UDP";
                    }
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (tcpActive != null) {
@@ -401,6 +400,7 @@
                        content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
                        content.append("f=\r\n");
                        try {
                            // 超时未收到Ack应该回复bye,当前等待时间为10秒
                            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
@@ -414,7 +414,33 @@
                                }
                            }, 60 * 1000);
                            responseSdpAck(request, content.toString(), platform);
                            // tcp主动模式,回复sdp后开启监听
                            if (sendRtpItem.isTcpActive()) {
                                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                Map<String, Object> param = new HashMap<>(12);
                                param.put("vhost","__defaultVhost__");
                                param.put("app",sendRtpItem.getApp());
                                param.put("stream",sendRtpItem.getStreamId());
                                param.put("ssrc", sendRtpItem.getSsrc());
                                if (!sendRtpItem.isTcpActive()) {
                                    param.put("dst_url",sendRtpItem.getIp());
                                    param.put("dst_port", sendRtpItem.getPort());
                                }
                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
                                param.put("is_udp", is_Udp);
                                param.put("src_port", localPort);
                                param.put("pt", sendRtpItem.getPt());
                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
                                if (!sendRtpItem.isTcp()) {
                                    // 开启rtcp保活
                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
                                }
                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
                                if (startSendRtpStreamResult != null) {
                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
                                }
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                        }
@@ -481,16 +507,8 @@
                                    }
                                });
                    }else {
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = null;
                        if (mediaServerItem.isRtpEnable()) {
                            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        }else {
                            streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
                        }
                        sendRtpItem.setStreamId(streamId);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        playService.play(mediaServerItem, device.getDeviceId(), channelId,false, ((code, msg, data) -> {
                        SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
                            if (code == InviteErrorCode.SUCCESS.getCode()){
                                hookEvent.run(code, msg, data);
                            }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
@@ -501,6 +519,16 @@
                                errorEvent.run(code, msg, data);
                            }
                        }));
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = null;
                        if (mediaServerItem.isRtpEnable()) {
                            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        }else {
                            streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase();
                        }
                        sendRtpItem.setStreamId(streamId);
                        sendRtpItem.setSsrc(ssrcInfo.getSsrc());
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    }
                } else if (gbStream != null) {
@@ -546,6 +574,18 @@
        }
    }
    private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
                                        JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("下级TCP被动启动监听失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
            logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
        } else {
            logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
        }
    }
    /**
     * 安排推流
     */
@@ -553,10 +593,10 @@
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
@@ -592,10 +632,10 @@
                            String channelId, String addressStr, String ssrc, String requesterId) {
        // 推流
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
@@ -711,7 +751,7 @@
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {