648540858
2023-04-23 269ad8cedbb07ca207a6f33af23085894dab4aa6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -1,21 +1,21 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -39,12 +39,14 @@
import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.Random;
import java.util.Vector;
/**
@@ -71,6 +73,9 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private DynamicTask dynamicTask;
@@ -157,11 +162,6 @@
                StreamProxyItem proxyByAppAndStream =null;
                // 不是通道可能是直播流
                if (channel != null && gbStream == null) {
//                    if (channel.getStatus() == 0) {
//                        logger.info("通道离线,返回400");
//                        responseAck(request, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
//                        return;
//                    }
                    // 通道存在,发100,TRYING
                    try {
                        responseAck(request, Response.TRYING);
@@ -232,7 +232,7 @@
                    }
                    return;
                } else {
                    logger.info("通道不存在,返回404");
                    logger.info("通道不存在,返回404: {}", channelId);
                    try {
                        // 通道不存在,发404,资源不存在
                        responseAck(request, Response.NOT_FOUND);
@@ -318,7 +318,7 @@
                    return;
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getOrigin().getAddress();
                String addressStr = sdp.getConnection().getAddress();
                logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
                Device device = null;
@@ -345,8 +345,7 @@
                        return;
                    }
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                            device.getDeviceId(), channelId,
                            mediaTransmissionTCP);
                            device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
@@ -385,7 +384,12 @@
                        } else {
                            content.append("t=0 0\r\n");
                        }
                        content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n");
                        int localPort = sendRtpItem.getLocalPort();
                        if (localPort == 0) {
                            // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
                            localPort = new Random().nextInt(65535) + 1;
                        }
                        content.append("m=video " + localPort + " RTP/AVP 96\r\n");
                        content.append("a=sendonly\r\n");
                        content.append("a=rtpmap:96 PS/90000\r\n");
                        content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
@@ -405,27 +409,23 @@
                            }, 60 * 1000);
                            responseSdpAck(request, content.toString(), platform);
                        } catch (SipException e) {
                            e.printStackTrace();
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        } catch (SipException | InvalidArgumentException | ParseException e) {
                            logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
                        }
                    };
                    SipSubscribe.Event errorEvent = ((event) -> {
                        // 未知错误。直接转发设备点播的错误
                        try {
                            Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
                            sipSender.transmitRequest(response);
                            sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
                        } catch (ParseException | SipException  e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        }
                    });
                    sendRtpItem.setApp("rtp");
                    if ("Playback".equalsIgnoreCase(sessionName)) {
                        sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
                        sendRtpItem.setStreamId(ssrcInfo.getStream());
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -439,12 +439,8 @@
                                        redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                                        try {
                                            responseAck(request, Response.REQUEST_TIMEOUT);
                                        } catch (SipException e) {
                                            e.printStackTrace();
                                        } catch (InvalidArgumentException e) {
                                            e.printStackTrace();
                                        } catch (ParseException e) {
                                            e.printStackTrace();
                                        } catch (SipException | InvalidArgumentException | ParseException e) {
                                            logger.error("[命令发送失败] 国标级联 录像回放 发送REQUEST_TIMEOUT: {}", e.getMessage());
                                        }
                                    } else {
                                        if (result.getMediaServerItem() != null) {
@@ -473,18 +469,26 @@
                            if (mediaServerItem.isRtpEnable()) {
                                streamId = String.format("%s_%s", device.getDeviceId(), channelId);
                            }
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
                            SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
                            logger.info(JSONObject.toJSONString(ssrcInfo));
                            sendRtpItem.setStreamId(ssrcInfo.getStream());
                            sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
                            MediaServerItem finalMediaServerItem = mediaServerItem;
                            playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
                                logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
                                redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
                            }, null);
                            });
                        } else {
                            // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。
                            if (ssrc.equals(ssrcDefault)) {
                                ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
                                ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                                sendRtpItem.setSsrc(ssrc);
                            }
                            sendRtpItem.setStreamId(playTransaction.getStream());
                            // 写入redis, 超时时回复
                            redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -495,6 +499,11 @@
                        }
                    }
                } else if (gbStream != null) {
                    if(ssrc.equals(ssrcDefault))
                    {
                        ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
                        ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
                    }
                    if("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null && streamPushItem.isPushIng()) {
                            // 推流状态
@@ -506,21 +515,17 @@
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }else if ("proxy".equals(gbStream.getStreamType())){
                        if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
                            pushProxyStream(evt, request, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }else{
                            //开启代理拉流
                            boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
                            if(start1) {
                        if (null != proxyByAppAndStream) {
                            if(proxyByAppAndStream.isStatus()){
                                pushProxyStream(evt, request, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }else{
                                //失败后通知
                                //开启代理拉流
                                notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            }
                        }
                    }
                }
@@ -528,7 +533,7 @@
        } catch (SdpParseException e) {
            logger.error("sdp解析错误", e);
        } catch (SdpException e) {
            e.printStackTrace();
            logger.error("未处理的异常 ", e);
        }
    }
@@ -543,8 +548,7 @@
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -583,8 +587,7 @@
            if (streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId,
                        mediaTransmissionTCP);
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                    logger.warn("服务器端口资源不足");
@@ -633,15 +636,38 @@
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            try {
                responseAck(request, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            // 监听流上线
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
            zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> {
                String app = responseJSON.getString("app");
                String stream = responseJSON.getString("stream");
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream);
                dynamicTask.stop(callIdHeader.getCallId());
                pushProxyStream(evt, request, gbStream,  platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            });
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            }, userSetting.getPlatformPlayTimeout());
            boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
            if (!start) {
                try {
                    responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                }
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                dynamicTask.stop(callIdHeader.getCallId());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
                try {
                    logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
@@ -662,11 +688,11 @@
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException e) {
                    e.printStackTrace();
                    logger.error("未处理的异常 ", e);
                } catch (InvalidArgumentException e) {
                    e.printStackTrace();
                    logger.error("未处理的异常 ", e);
                } catch (ParseException e) {
                    e.printStackTrace();
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
            // 添加监听
@@ -678,18 +704,18 @@
                dynamicTask.stop(callIdHeader.getCallId());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP);
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
@@ -740,18 +766,18 @@
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
                channelId, mediaTransmissionTCP, platform.isRtcp(),null, responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            e.printStackTrace();
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
@@ -804,7 +830,13 @@
        content.append("s=Play\r\n");
        content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
        content.append("t=0 0\r\n");
        content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n");
        // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
        int localPort = sendRtpItem.getLocalPort();
        if(localPort == 0)
        {
            localPort = new Random().nextInt(65535) + 1;
        }
        content.append("m=video " + localPort + " RTP/AVP 96\r\n");
        content.append("a=sendonly\r\n");
        content.append("a=rtpmap:96 PS/90000\r\n");
        if (sendRtpItem.isTcp()) {
@@ -821,11 +853,11 @@
        try {
            return responseSdpAck(request, content.toString(), platform);
        } catch (SipException e) {
            e.printStackTrace();
            logger.error("未处理的异常 ", e);
        } catch (InvalidArgumentException e) {
            e.printStackTrace();
            logger.error("未处理的异常 ", e);
        } catch (ParseException e) {
            e.printStackTrace();
            logger.error("未处理的异常 ", e);
        }
        return null;
    }
@@ -898,7 +930,7 @@
                    return;
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getOrigin().getAddress();
                String addressStr = sdp.getConnection().getAddress();
                logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
            } catch (SdpException e) {
                logger.error("[SDP解析异常]", e);