648540858
2022-09-02 7db2bf7b51015a7e989ea5b0f4d9486dfd99d4bd
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -13,13 +13,15 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -38,7 +40,6 @@
import javax.sdp.*;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -65,6 +66,8 @@
    @Autowired
    private IStreamPushService streamPushService;
    @Autowired
    private IStreamProxyService streamProxyService;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
@@ -120,10 +123,7 @@
        //  Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
        try {
            Request request = evt.getRequest();
            SipURI sipUri = (SipURI) request.getRequestURI();
            //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
            //String channelId = sipURI.getUser();
            String channelId = SipUtils.getChannelIdFromHeader(request);
            String channelId = SipUtils.getChannelIdFromRequest(request);
            String requesterId = SipUtils.getUserIdFromFromHeader(request);
            CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
            if (requesterId == null || channelId == null) {
@@ -145,6 +145,7 @@
                MediaServerItem mediaServerItem = null;
                StreamPushItem streamPushItem = null;
                StreamProxyItem proxyByAppAndStream =null;
                // 不是通道可能是直播流
                if (channel != null && gbStream == null) {
                    if (channel.getStatus() == 0) {
@@ -174,6 +175,13 @@
                        if ("push".equals(gbStream.getStreamType())) {
                            streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
                            if (streamPushItem == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
                            }
                        }else if("proxy".equals(gbStream.getStreamType())){
                            proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
                            if (proxyByAppAndStream == null) {
                                logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
                                responseAck(evt, Response.GONE);
                                return;
@@ -240,16 +248,16 @@
                        String protocol = media.getProtocol();
                        // 区分TCP发流还是udp, 当前默认udp
                        if ("TCP/RTP/AVP".equals(protocol)) {
                        if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) {
                            String setup = mediaDescription.getAttribute("setup");
                            if (setup != null) {
                                mediaTransmissionTCP = true;
                                if ("active".equals(setup)) {
                                if ("active".equalsIgnoreCase(setup)) {
                                    tcpActive = true;
                                    // 不支持tcp主动
                                    responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播
                                    return;
                                } else if ("passive".equals(setup)) {
                                } else if ("passive".equalsIgnoreCase(setup)) {
                                    tcpActive = false;
                                }
                            }
@@ -294,11 +302,11 @@
                        return;
                    }
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
                    sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
                    Long finalStartTime = startTime;
                    Long finalStopTime = stopTime;
                    ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
                    ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
                        String app = responseJSON.getString("app");
                        String stream = responseJSON.getString("stream");
                        logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
@@ -313,7 +321,7 @@
                        content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
                        content.append("s=" + sessionName + "\r\n");
                        content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
                        if ("Playback".equals(sessionName)) {
                        if ("Playback".equalsIgnoreCase(sessionName)) {
                            content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
                        } else {
                            content.append("t=0 0\r\n");
@@ -357,7 +365,7 @@
                        }
                    });
                    sendRtpItem.setApp("rtp");
                    if ("Playback".equals(sessionName)) {
                    if ("Playback".equalsIgnoreCase(sessionName)) {
                        sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
                        sendRtpItem.setStreamId(ssrcInfo.getStream());
@@ -401,6 +409,7 @@
                                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                            }
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
                            logger.info(JSONObject.toJSONString(ssrcInfo));
                            sendRtpItem.setStreamId(ssrcInfo.getStream());
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -419,14 +428,33 @@
                        }
                    }
                } else if (gbStream != null) {
                    if (streamPushItem != null && streamPushItem.isStatus()) {
                        // 在线状态
                        pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    } else {
                        // 不在线 拉起
                        notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                    if("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null && streamPushItem.isPushIng()) {
                            // 推流状态
                            pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 未推流 拉起
                            notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }else if ("proxy".equals(gbStream.getStreamType())){
                        if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
                            pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }else{
                            //开启代理拉流
                            boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
                            if(start1) {
                                pushProxyStream(evt, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }else{
                                //失败后通知
                                notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    }
                }
            }
@@ -445,13 +473,45 @@
    /**
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
                    responseAck(evt, Response.BUSY_HERE);
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
                sendRtpItem.setDialog(dialogByteArray);
                byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
                sendRtpItem.setTransaction(transactionByteArray);
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
        }
    }
    private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
        // 推流
        if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            if (streamReady) {
                // 自平台内容
@@ -490,7 +550,6 @@
        }
    }
    /**
     * 通知流上线
     */
@@ -500,7 +559,7 @@
                                    String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
@@ -508,7 +567,7 @@
                return;
            }
            // 发送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
            logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
@@ -518,7 +577,7 @@
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                try {
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException e) {
                    e.printStackTrace();
@@ -533,7 +592,7 @@
            Boolean finalTcpActive = tcpActive;
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
@@ -621,7 +680,7 @@
                            // 离线
                            // 查询是否在本机上线了
                            StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                            if (currentStreamPushItem.isStatus()) {
                            if (currentStreamPushItem.isPushIng()) {
                                // 在线状态
                                pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);