648540858
2022-09-14 ab74d1cff90cc563e0eca8deb8f154d84eb51908
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -32,6 +32,7 @@
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -87,7 +88,7 @@
    private DynamicTask dynamicTask;
    @Autowired
    private SIPCommander cmder;
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -109,6 +110,9 @@
   @Autowired
   private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
    @Autowired
    private SIPProcessorObserver sipProcessorObserver;
@@ -430,7 +434,14 @@
                        if (playTransaction != null) {
                            Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
                            if (!streamReady) {
                                playTransaction = null;
                                boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
                                if (hasRtpServer) {
                                    logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来");
                                    HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
                                    zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
                                }else {
                                    playTransaction = null;
                                }
                            }
                        }
                        if (playTransaction == null) {
@@ -578,7 +589,6 @@
            otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
        }
    }
    /**
     * 通知流上线
@@ -593,7 +603,8 @@
            responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
                // 平台设置中关闭了拉起离线的推流则直接回复
                responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                return;
            }
            // 发送redis消息以使设备上线
@@ -629,7 +640,7 @@
                            app, stream, channelId, mediaTransmissionTCP);
                    if (sendRtpItem == null) {
                        logger.warn("服务器端口资源不足");
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                        try {
                            responseAck(evt, Response.BUSY_HERE);
                        } catch (SipException e) {
@@ -660,6 +671,23 @@
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
            });
            // 添加回复的拒绝或者错误的通知
            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
                if (response.getCode() != 0) {
                    dynamicTask.stop(callIdHeader.getCallId());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    try {
                        responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                    } catch (SipException e) {
                        throw new RuntimeException(e);
                    } catch (InvalidArgumentException e) {
                        throw new RuntimeException(e);
                    } catch (ParseException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }