648540858
2024-01-10 c25a99d60bef3d3bbd59fee895bd658928fd00db
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
old mode 100644 new mode 100755
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
@@ -12,13 +14,11 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -46,6 +46,8 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -75,6 +77,9 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
@@ -90,7 +95,7 @@
    private SIPSender sipSender;
    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -132,7 +137,7 @@
            String requesterId = SipUtils.getUserIdFromFromHeader(request);
            CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                logger.info("无法从请求中获取到平台id,返回400");
                // 参数不全, 发400,请求错误
                try {
                    responseAck(request, Response.BAD_REQUEST);
@@ -152,7 +157,7 @@
                // 查询平台下是否有该通道
                DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
                GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
                PlatformCatalog catalog = storager.getCatalog(channelId);
                PlatformCatalog catalog = storager.getCatalog(requesterId, channelId);
                MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
@@ -180,16 +185,11 @@
                            return;
                        } else {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                try {
                                    responseAck(request, Response.GONE);
                                } catch (SipException | InvalidArgumentException | ParseException e) {
                                    logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
                                }
                                return;
                            }else {
                                 // TODO 可能漏回复消息
                            if (streamPushItem != null) {
                                mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
                            }
                            if (mediaServerItem == null) {
                                mediaServerItem = mediaServerService.getDefaultMediaServer();
                            }
                        }
                    } else {
@@ -349,7 +349,7 @@
                        streamTypeStr = "UDP";
                    }
                    logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (tcpActive != null) {
@@ -400,6 +400,7 @@
                        content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
                        content.append("f=\r\n");
                        try {
                            // 超时未收到Ack应该回复bye,当前等待时间为10秒
                            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
@@ -413,7 +414,33 @@
                                }
                            }, 60 * 1000);
                            responseSdpAck(request, content.toString(), platform);
                            // tcp主动模式,回复sdp后开启监听
                            if (sendRtpItem.isTcpActive()) {
                                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                Map<String, Object> param = new HashMap<>(12);
                                param.put("vhost","__defaultVhost__");
                                param.put("app",sendRtpItem.getApp());
                                param.put("stream",sendRtpItem.getStreamId());
                                param.put("ssrc", sendRtpItem.getSsrc());
                                if (!sendRtpItem.isTcpActive()) {
                                    param.put("dst_url",sendRtpItem.getIp());
                                    param.put("dst_port", sendRtpItem.getPort());
                                }
                                String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
                                param.put("is_udp", is_Udp);
                                param.put("src_port", localPort);
                                param.put("pt", sendRtpItem.getPt());
                                param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
                                param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
                                if (!sendRtpItem.isTcp()) {
                                    // 开启rtcp保活
                                    param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
                                }
                                JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
                                if (startSendRtpStreamResult != null) {
                                    startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
                                }
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                        }
@@ -432,7 +459,10 @@
                    sendRtpItem.setApp("rtp");
                    if ("Playback".equalsIgnoreCase(sessionName)) {
                        sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
                        String startTimeStr = DateUtil.urlFormatter.format(start);
                        String endTimeStr = DateUtil.urlFormatter.format(end);
                        String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
                        sendRtpItem.setStreamId(ssrcInfo.getStream());
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -449,13 +479,13 @@
                                        errorEvent.run(code, msg, data);
                                    }
                                });
                    }else if ("Download".equalsIgnoreCase(sessionName)) {
                    } else if ("Download".equalsIgnoreCase(sessionName)) {
                        // 获取指定的下载速度
                        Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
                        MediaDescription mediaDescription = null;
                        String downloadSpeed = "1";
                        if (sdpMediaDescriptions.size() > 0) {
                            mediaDescription = (MediaDescription)sdpMediaDescriptions.get(0);
                            mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
                        }
                        if (mediaDescription != null) {
                            downloadSpeed = mediaDescription.getAttribute("downloadspeed");
@@ -469,37 +499,34 @@
                        playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
                                DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
                                (code, msg, data) -> {
                                    if (code == InviteErrorCode.SUCCESS.getCode()){
                                    if (code == InviteErrorCode.SUCCESS.getCode()) {
                                        hookEvent.run(code, msg, data);
                                    }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
                                    } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
                                        logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId);
                                        redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                                        errorEvent.run(code, msg, data);
                                    }else {
                                    } else {
                                        errorEvent.run(code, msg, data);
                                    }
                                });
                    }else {
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = null;
                        if (mediaServerItem.isRtpEnable()) {
                            streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        }else {
                            streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
                        }
                        sendRtpItem.setStreamId(streamId);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
                            if (code == InviteErrorCode.SUCCESS.getCode()){
                    } else {
                        SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
                            if (code == InviteErrorCode.SUCCESS.getCode()) {
                                hookEvent.run(code, msg, data);
                            }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
                            } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                                errorEvent.run(code, msg, data);
                            }else {
                            } else {
                                errorEvent.run(code, msg, data);
                            }
                        }));
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        sendRtpItem.setStreamId(streamId);
                        sendRtpItem.setSsrc(ssrcInfo.getSsrc());
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    }
                } else if (gbStream != null) {
@@ -545,6 +572,18 @@
        }
    }
    private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
                                        JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
        if (jsonObject == null) {
            logger.error("下级TCP被动启动监听失败: 请检查ZLM服务");
        } else if (jsonObject.getInteger("code") == 0) {
            logger.info("调用ZLM-TCP被动推流接口, 结果: {}",  jsonObject);
            logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
        } else {
            logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
        }
    }
    /**
     * 安排推流
     */
@@ -552,10 +591,10 @@
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
@@ -591,10 +630,10 @@
                            String channelId, String addressStr, String ssrc, String requesterId) {
        // 推流
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
@@ -646,10 +685,9 @@
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            // 监听流上线
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
            zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> {
                String app = responseJSON.getString("app");
                String stream = responseJSON.getString("stream");
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream);
            zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
                OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
                dynamicTask.stop(callIdHeader.getCallId());
                pushProxyStream(evt, request, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
@@ -695,11 +733,7 @@
                try {
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException e) {
                    logger.error("未处理的异常 ", e);
                } catch (InvalidArgumentException e) {
                    logger.error("未处理的异常 ", e);
                } catch (ParseException e) {
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
@@ -711,7 +745,7 @@
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {