648540858
2024-04-24 c21d973977a9f1d00d26179de764687ddd0ec56c
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -44,6 +44,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sdp.*;
@@ -54,10 +55,7 @@
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
import java.util.*;
/**
 * SIP命令类型: INVITE请求
@@ -86,16 +84,16 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    private IRedisRpcService redisRpcService;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -122,9 +120,6 @@
    private UserSetting userSetting;
    @Autowired
    private ZLMMediaListManager mediaListManager;
    @Autowired
    private SipConfig config;
    @Autowired
@@ -132,6 +127,9 @@
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Override
@@ -568,19 +566,16 @@
                    }
                } else if (gbStream != null) {
                    String ssrc;
                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    SendRtpItem sendRtpItem = new SendRtpItem();
                    sendRtpItem.setTcpActive(tcpActive);
                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
                    }
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setTcp(mediaTransmissionTCP);
                    sendRtpItem.setRtcp(platform.isRtcp());
                    sendRtpItem.setSsrc(ssrc);
                    sendRtpItem.setPlatformName(platform.getName());
                    sendRtpItem.setPlatformId(platform.getServerGBId());
                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
@@ -593,37 +588,57 @@
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    sendRtpItem.setStatus(0);
                    sendRtpItem.setSessionName(sessionName);
                    // 清理可能存在的缓存避免用到旧的数据
                    List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
                    if (!sendRtpItemList.isEmpty()) {
                        for (SendRtpItem rtpItem : sendRtpItemList) {
                            redisCatchStorage.deleteSendRTPServer(rtpItem);
                        }
                    }
                    if ("push".equals(gbStream.getStreamType())) {
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            sendRtpItem.setServerId(pushListItem.getSeverId());
                            if (pushListItem != null) {
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(sendRtpItem, mediaServerItem, platform, request);
                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                // 开始推流
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                // 未推流 拉起
                                if (!platform.isStartOfflinePush()) {
                                    // 平台设置中关闭了拉起离线的推流则直接回复
                                    try {
                                        logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                                    }
                                    return;
                                }
                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
                        if (null != proxyByAppAndStream) {
                            if (sendRtpItem.getSsrc() == null) {
                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                                sendRtpItem.setSsrc(ssrc);
                            }
                            if (proxyByAppAndStream.isStatus()) {
                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                            } else {
                                //开启代理拉流
                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    }
                }
            }
@@ -649,33 +664,23 @@
    /**
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
            if (sendRtpItem == null) {
                logger.warn("服务器端口资源不足");
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                    }
                    return;
                }
                return;
            }
            if (tcpActive != null) {
                sendRtpItem.setTcpActive(tcpActive);
            }
            sendRtpItem.setPlayType(InviteStreamType.PUSH);
            sendRtpItem.setPlayType(InviteStreamType.PROXY);
            // 写入redis, 超时时回复
            sendRtpItem.setStatus(1);
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
@@ -683,12 +688,10 @@
                sendRtpItem.setToTag(response.getToTag());
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
        }
    }
    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 推流
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -706,23 +709,21 @@
                }
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setFromTag(request.getFromTag());
                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                if (sendRtpItem.getSsrc() == null) {
                    // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    sendRtpItem.setSsrc(ssrc);
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // 不在线 拉起
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
            }
        } else {
            SendRtpItem sendRtpItem = new SendRtpItem();
            sendRtpItem.setRtcp(platform.isRtcp());
            sendRtpItem.setTcp(mediaTransmissionTCP);
            sendRtpItem.setTcpActive();
            // 其他平台内容
            otherWvpPushStream(sendRtpItem, request, platform);
        }
@@ -733,29 +734,28 @@
     */
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // TODO 控制启用以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        // 监听流上线
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
            dynamicTask.stop(callIdHeader.getCallId());
            pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            dynamicTask.stop(sendRtpItem.getCallId());
            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
        });
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
        }, userSetting.getPlatformPlayTimeout());
        boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!start) {
            try {
                responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            dynamicTask.stop(callIdHeader.getCallId());
            dynamicTask.stop(sendRtpItem.getCallId());
        }
    }
@@ -763,89 +763,80 @@
     * 通知流上线
     */
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        if (!platform.isStartOfflinePush()) {
            // 平台设置中关闭了拉起离线的推流则直接回复
            try {
                logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            return;
        }
        // 发送redis消息以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
        // 发送redis消息以使设备上线,流上线后被
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        // 写入redis待发流信息,供其他wvp读取并生成发流信息
        SendRtpItem sendRtpItemTemp = new SendRtpItem();
        sendRtpItemTemp.setIp(addressStr);
        sendRtpItemTemp.setPort(port);
        sendRtpItemTemp.setSsrc(ssrc);
        sendRtpItemTemp.setPlatformId(requesterId);
        sendRtpItemTemp.setPlatformName(platform.getName());
        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
        sendRtpItemTemp.setRtcp(platform.isRtcp());
        sendRtpItemTemp.setTcpActive(tcpActive);
        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
            dynamicTask.stop(callIdHeader.getCallId());
            redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
        //
        long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            if (sendRtpItemKey == null) {
                logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
            if (sendRtpItemFromRedis == null) {
                logger.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (SipException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (InvalidArgumentException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (ParseException e) {
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("未处理的异常 ", e);
                    }
                    return;
                }
                sendRtpItemTemp.setLocalPort(localPort);
                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
                // 写入redis, 超时时回复
                sendRtpItemTemp.setStatus(1);
                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
                sendRtpItemTemp.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
                if (response != null) {
                    sendRtpItemTemp.setToTag(response.getToTag());
                sendRtpItem.setLocalPort(localPort);
                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // 其他平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
        // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE  '{"code":1,"msg":"失败","app":"1","stream":"2"}'
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(callIdHeader.getCallId());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                dynamicTask.stop(sendRtpItem.getCallId());
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
                redisRpcService.removeCallback(key);
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -861,13 +852,13 @@
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisCatchStorage.sendStartSendRtp(sendRtpItem);
        logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
        if (sendRtpItem == null) {
            return;
        }
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
        sendRtpItem.setFromTag(request.getFromTag());
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
        if (response != null) {
            sendRtpItem.setToTag(response.getToTag());