648540858
2024-03-05 5bf87ca330b446695de7e9d8149ec53e79a453c5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -18,10 +18,7 @@
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -33,8 +30,10 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -78,6 +77,9 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private SSRCFactory ssrcFactory;
@@ -133,11 +135,24 @@
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        try {
            SIPRequest request = (SIPRequest)evt.getRequest();
            String channelId = SipUtils.getChannelIdFromRequest(request);
            String channelIdFromSub = SipUtils.getChannelIdFromRequest(request);
            // 解析sdp消息, 使用jainsip 自带的sdp解析方式
            String contentString = new String(request.getRawContent());
            Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
            SessionDescription sdp = gb28181Sdp.getBaseSdb();
            String sessionName = sdp.getSessionName().getValue();
            String channelIdFromSdp = null;
            if(StringUtils.equalsIgnoreCase("Playback", sessionName)){
                URIField uriField = (URIField)sdp.getURI();
                channelIdFromSdp = uriField.getURI().split(":")[0];
            }
            final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub;
            String requesterId = SipUtils.getUserIdFromFromHeader(request);
            CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
            if (requesterId == null || channelId == null) {
                logger.info("无法从FromHeader的Address中获取到平台id,返回400");
                logger.info("无法从请求中获取到平台id,返回400");
                // 参数不全, 发400,请求错误
                try {
                    responseAck(request, Response.BAD_REQUEST);
@@ -147,6 +162,8 @@
                return;
            }
            logger.info("[INVITE] requesterId: {}, callId: {}, 来自:{}:{}",
                    requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort());
            // 查询请求是否来自上级平台\设备
            ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
@@ -240,12 +257,6 @@
                    }
                    return;
                }
                // 解析sdp消息, 使用jainsip 自带的sdp解析方式
                String contentString = new String(request.getRawContent());
                Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
                SessionDescription sdp = gb28181Sdp.getBaseSdb();
                String sessionName = sdp.getSessionName().getValue();
                Long startTime = null;
                Long stopTime = null;
@@ -394,7 +405,16 @@
                            // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
                            localPort = new Random().nextInt(65535) + 1;
                        }
                        content.append("m=video " + localPort + " RTP/AVP 96\r\n");
                        if (sendRtpItem.isTcp()) {
                            content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n");
                            if (!sendRtpItem.isTcpActive()) {
                                content.append("a=setup:active\r\n");
                            } else {
                                content.append("a=setup:passive\r\n");
                            }
                        }else {
                            content.append("m=video " + localPort + " RTP/AVP 96\r\n");
                        }
                        content.append("a=sendonly\r\n");
                        content.append("a=rtpmap:96 PS/90000\r\n");
                        content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
@@ -479,47 +499,55 @@
                                        errorEvent.run(code, msg, data);
                                    }
                                });
                    } else if ("Download".equalsIgnoreCase(sessionName)) {
                        // 获取指定的下载速度
                        Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
                        MediaDescription mediaDescription = null;
                        String downloadSpeed = "1";
                        if (sdpMediaDescriptions.size() > 0) {
                            mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
                        }
                        if (mediaDescription != null) {
                            downloadSpeed = mediaDescription.getAttribute("downloadspeed");
                        }
                        sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
                        sendRtpItem.setStreamId(ssrcInfo.getStream());
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
                                DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
                                (code, msg, data) -> {
                                    if (code == InviteErrorCode.SUCCESS.getCode()) {
                                        hookEvent.run(code, msg, data);
                                    } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
                                        logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId);
                                        redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                                        errorEvent.run(code, msg, data);
                                    } else {
                                        errorEvent.run(code, msg, data);
                                    }
                                });
                    } else {
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
                        sendRtpItem.setPlayType(InviteStreamType.PLAY);
//                        SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
                        StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
                        if (streamInfo != null) {
                            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, streamInfo.getApp(), streamInfo.getStream());
                            if (!streamReady) {
                                redisCatchStorage.stopPlay(streamInfo);
                                storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
                                streamInfo = null;
                            }
                        }
                        if (streamInfo == null) {
                            String streamId = null;
                            if (mediaServerItem.isRtpEnable()) {
                                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                            }
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
                            logger.info(JSONObject.toJSONString(ssrcInfo));
                            sendRtpItem.setStreamId(ssrcInfo.getStream());
                            sendRtpItem.setSsrc(ssrc);
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
                            MediaServerItem finalMediaServerItem = mediaServerItem;
                            playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
                        String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                        sendRtpItem.setStreamId(streamId);
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                        SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
                            if (code == InviteErrorCode.SUCCESS.getCode()) {
                                hookEvent.run(code, msg, data);
                            } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                            });
                        } else {
                            // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。
                            sendRtpItem.setSsrc(ssrc);
                            sendRtpItem.setStreamId(playTransaction.getStream());
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
                            JSONObject jsonObject = new JSONObject();
                            jsonObject.put("app", sendRtpItem.getApp());
                            jsonObject.put("stream", sendRtpItem.getStreamId());
                            hookEvent.response(mediaServerItem, jsonObject);
                        }
                                errorEvent.run(code, msg, data);
                            } else {
                                errorEvent.run(code, msg, data);
                            }
                        }));
                        sendRtpItem.setSsrc(ssrcInfo.getSsrc());
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
                    }
                } else if (gbStream != null) {
@@ -698,9 +726,6 @@
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                dynamicTask.stop(callIdHeader.getCallId());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
@@ -723,13 +748,10 @@
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                try {
                    redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException e) {
                    logger.error("未处理的异常 ", e);
                } catch (InvalidArgumentException e) {
                    logger.error("未处理的异常 ", e);
                } catch (ParseException e) {
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
@@ -740,6 +762,7 @@
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
@@ -804,7 +827,7 @@
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, platform.isRtcp(),null, responseSendItemMsg -> {
                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");