648540858
2024-04-17 e85cef434595073b777cea8338543b93d3956b93
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -29,6 +29,7 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -53,10 +54,7 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
import java.util.*;
/**
 * SIP命令类型: INVITE请求
@@ -125,6 +123,9 @@
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Override
@@ -585,7 +586,13 @@
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setStatus(0);
                    sendRtpItem.setSessionName(sessionName);
                    // 清理可能存在的缓存避免用到旧的数据
                    List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
                    if (!sendRtpItemList.isEmpty()) {
                        for (SendRtpItem rtpItem : sendRtpItemList) {
                            redisCatchStorage.deleteSendRTPServer(rtpItem);
                        }
                    }
                    if ("push".equals(gbStream.getStreamType())) {
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        if (streamPushItem != null) {
@@ -759,6 +766,7 @@
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
@@ -770,7 +778,7 @@
        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
@@ -801,7 +809,18 @@
                // 其他平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(sendRtpItem.getCallId());
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
                }
            }
        });
    }
@@ -811,7 +830,7 @@
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);