648540858
2024-04-17 bf6e09d231f49fb0c2cd5a81f6b31cc64d27c368
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -86,16 +86,13 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    private IRedisRpcService redisRpcService;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -122,9 +119,6 @@
    private UserSetting userSetting;
    @Autowired
    private RedisPlatformPushStreamOnlineLister mediaListManager;
    @Autowired
    private SipConfig config;
    @Autowired
@@ -132,6 +126,9 @@
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Override
@@ -594,15 +591,17 @@
                    sendRtpItem.setSessionName(sessionName);
                    if ("push".equals(gbStream.getStreamType())) {
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                // 开始推流
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                if (!platform.isStartOfflinePush()) {
@@ -702,8 +701,6 @@
                }
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setFromTag(request.getFromTag());
                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
@@ -714,7 +711,6 @@
                    sendRtpItem.setSsrc(ssrc);
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // 不在线 拉起
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
@@ -767,20 +763,17 @@
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
        //
        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -814,12 +807,11 @@
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(sendRtpItem.getCallId());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -836,12 +828,9 @@
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisCatchStorage.sendStartSendRtp(sendRtpItem);
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
        sendRtpItem.setFromTag(request.getFromTag());
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
        if (response != null) {
            sendRtpItem.setToTag(response.getToTag());