648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,25 +19,19 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -341,8 +335,12 @@
                    return;
                }
                String username = sdp.getOrigin().getUsername();
                String addressStr = sdp.getConnection().getAddress();
                String addressStr;
                if(StringUtils.isEmpty(platform.getSendStreamIp())){
                    addressStr = sdp.getConnection().getAddress();
                }else {
                    addressStr = platform.getSendStreamIp();
                }
                Device device = null;
                // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@@ -468,7 +466,8 @@
                            if (sendRtpItem.isTcpActive()) {
                                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                try {
                                    mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
                                    mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5);
                                    redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
                                }catch (ControllerException e) {}
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -491,7 +490,7 @@
                        String startTimeStr = DateUtil.urlFormatter.format(start);
                        String endTimeStr = DateUtil.urlFormatter.format(end);
                        String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, device.getStreamModeForParam());
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false,!channel.getHasAudio(), false, device.getStreamModeForParam());
                        sendRtpItem.setStream(stream);
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -521,7 +520,7 @@
                        }
                        sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, device.getStreamModeForParam());
                        SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.getHasAudio(), false, device.getStreamModeForParam());
                        sendRtpItem.setStream(ssrcInfo.getStream());
                        // 写入redis, 超时时回复
                        redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -596,11 +595,10 @@
                            // 从redis查询是否正在接收这个推流
                            StreamPushItem pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                sendRtpItem.setServerId(pushListItem.getServerId());
                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                pushListItem.setSelf(userSetting.getServerId().equals(pushListItem.getServerId()));
                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                // 开始推流
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
@@ -657,9 +655,10 @@
    /**
     * 安排推流
     */
    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
        MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaInfo != null) {
                // 自平台内容
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -677,7 +676,7 @@
            sendRtpItem.setStatus(1);
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
            SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
@@ -685,11 +684,11 @@
        }
    }
    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
    private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 推流
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
            MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (mediaInfo != null ) {
                // 自平台内容
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
@@ -726,20 +725,19 @@
    /**
     * 通知流上线
     */
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // TODO 控制启用以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        // 监听流上线
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
        Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId());
        hookSubscribe.addSubscribe(hook, (hookData)->{
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
            dynamicTask.stop(sendRtpItem.getCallId());
            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
        });
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            hookSubscribe.removeSubscribe(hook);
        }, userSetting.getPlatformPlayTimeout());
        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!start) {
@@ -748,7 +746,7 @@
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            hookSubscribe.removeSubscribe(hook);
            dynamicTask.stop(sendRtpItem.getCallId());
        }
    }
@@ -756,7 +754,7 @@
    /**
     * 通知流上线
     */
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 发送redis消息以使设备上线,流上线后被
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
@@ -860,7 +858,7 @@
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
    }
    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
    public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
        String sdpIp = sendRtpItem.getLocalIp();
        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {