648540858
2024-04-15 f9abfca003bc9515f1f6028657fa6347326a1402
临时提交
8个文件已修改
2个文件已添加
1个文件已删除
1084 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 386 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java 474 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,8 @@
    public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
    public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
    public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
    public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -32,9 +32,6 @@
    private RedisStreamMsgListener redisStreamMsgListener;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
    @Autowired
@@ -48,6 +45,12 @@
    @Autowired
    private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
    @Autowired
    private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
    @Autowired
    private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
    /**
@@ -65,12 +68,13 @@
        container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
        container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
        container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
        container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
        container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -22,6 +22,11 @@
     */
    private String platformId;
    /**
     * 平台名称
     */
    private String platformName;
     /**
     * 对应设备id
     */
@@ -60,6 +65,11 @@
     * 是否为tcp主动模式
     */
    private boolean tcpActive;
    /**
     * 自己推流使用的IP
     */
    private String localIp;
    /**
     * 自己推流使用的端口
@@ -306,6 +316,22 @@
        this.receiveStream = receiveStream;
    }
    public String getPlatformName() {
        return platformName;
    }
    public void setPlatformName(String platformName) {
        this.platformName = platformName;
    }
    public String getLocalIp() {
        return localIp;
    }
    public void setLocalIp(String localIp) {
        this.localIp = localIp;
    }
    @Override
    public String toString() {
        return "SendRtpItem{" +
@@ -313,6 +339,7 @@
                ", port=" + port +
                ", ssrc='" + ssrc + '\'' +
                ", platformId='" + platformId + '\'' +
                ", platformName='" + platformName + '\'' +
                ", deviceId='" + deviceId + '\'' +
                ", app='" + app + '\'' +
                ", channelId='" + channelId + '\'' +
@@ -320,6 +347,7 @@
                ", stream='" + stream + '\'' +
                ", tcp=" + tcp +
                ", tcpActive=" + tcpActive +
                ", localIp=" + localIp +
                ", localPort=" + localPort +
                ", mediaServerId='" + mediaServerId + '\'' +
                ", serverId='" + serverId + '\'' +
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -18,6 +18,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -28,7 +29,6 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -127,12 +127,11 @@
    @Autowired
    private SipConfig config;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private VideoStreamSessionManager streamSession;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Override
@@ -577,21 +576,40 @@
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    SendRtpItem sendRtpItem = new SendRtpItem();
                    sendRtpItem.setTcpActive(tcpActive);
                    sendRtpItem.setTcp(mediaTransmissionTCP);
                    sendRtpItem.setRtcp(platform.isRtcp());
                    sendRtpItem.setSsrc(ssrc);
                    sendRtpItem.setPlatformName(platform.getName());
                    sendRtpItem.setPlatformId(platform.getServerGBId());
                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
                    sendRtpItem.setChannelId(channelId);
                    sendRtpItem.setIp(addressStr);
                    sendRtpItem.setPort(port);
                    sendRtpItem.setUsePs(true);
                    sendRtpItem.setApp(gbStream.getApp());
                    sendRtpItem.setStream(gbStream.getStream());
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    sendRtpItem.setStatus(0);
                    if ("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            sendRtpItem.setServerId(pushListItem.getSeverId());
                            if (pushListItem != null) {
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                pushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                // 未推流 拉起
                                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
@@ -601,8 +619,7 @@
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                            } else {
                                //开启代理拉流
                                notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
@@ -659,8 +676,9 @@
            sendRtpItem.setStatus(1);
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
            SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
            if (response != null) {
                sendRtpItem.setToTag(response.getToTag());
            }
@@ -670,19 +688,14 @@
    }
    private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 推流
        if (streamPushItem.isSelf()) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
                if (sendRtpItem == null) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
@@ -691,16 +704,11 @@
                    }
                    return;
                }
                if (tcpActive != null) {
                    sendRtpItem.setTcpActive(tcpActive);
                }
                sendRtpItem.setPlayType(InviteStreamType.PUSH);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setCallId(callIdHeader.getCallId());
                sendRtpItem.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
@@ -708,210 +716,168 @@
            } else {
                // 不在线 拉起
                notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
            }
        } else {
            SendRtpItem sendRtpItem = new SendRtpItem();
            sendRtpItem.setRtcp(platform.isRtcp());
            sendRtpItem.setTcp(mediaTransmissionTCP);
            sendRtpItem.setTcpActive();
            // 其他平台内容
            otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            otherWvpPushStream(sendRtpItem, request, platform);
        }
    }
    /**
     * 通知流上线
     */
    private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        if ("proxy".equals(gbStream.getStreamType())) {
            // TODO 控制启用以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
            // 监听流上线
            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
            zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
                OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
                logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
                dynamicTask.stop(callIdHeader.getCallId());
                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            });
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            }, userSetting.getPlatformPlayTimeout());
            boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
            if (!start) {
                try {
                    responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                }
                zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
                dynamicTask.stop(callIdHeader.getCallId());
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // TODO 控制启用以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
        // 监听流上线
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
            dynamicTask.stop(callIdHeader.getCallId());
            pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
        });
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
        }, userSetting.getPlatformPlayTimeout());
        boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
        if (!start) {
            try {
                responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
        } else if ("push".equals(gbStream.getStreamType())) {
            if (!platform.isStartOfflinePush()) {
                // 平台设置中关闭了拉起离线的推流则直接回复
                try {
                    logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                }
                return;
            }
            // 发送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                    gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                    platform.getName(), null, gbStream.getMediaServerId());
            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
            // 设置超时
            dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                try {
                    redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    responseAck(request, Response.REQUEST_TIMEOUT); // 超时
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
            }, userSetting.getPlatformPlayTimeout());
            // 添加监听
            int finalPort = port;
            Boolean finalTcpActive = tcpActive;
            // 添加在本机上线的通知
            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                dynamicTask.stop(callIdHeader.getCallId());
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                if (serverId.equals(userSetting.getServerId())) {
                    SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
                            app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
                    if (sendRtpItem == null) {
                        logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
                    if (finalTcpActive != null) {
                        sendRtpItem.setTcpActive(finalTcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    // 写入redis, 超时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                } else {
                    // 其他平台内容
                    otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                            mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                }
            });
            // 添加回复的拒绝或者错误的通知
            redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
                if (response.getCode() != 0) {
                    dynamicTask.stop(callIdHeader.getCallId());
                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                    try {
                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
                    }
                }
            });
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            dynamicTask.stop(callIdHeader.getCallId());
        }
    }
    /**
     * 来自其他wvp的推流
     * 通知流上线
     */
    private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
                                    CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                                    int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                                    String channelId, String addressStr, String ssrc, String requesterId) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
                streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
                channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
                    SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
                    if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
                        logger.warn("服务器端口资源不足");
                        try {
                            responseAck(request, Response.BUSY_HERE);
                        } catch (SipException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (InvalidArgumentException e) {
                            logger.error("未处理的异常 ", e);
                        } catch (ParseException e) {
                            logger.error("未处理的异常 ", e);
                        }
                        return;
                    }
                    // 收到sendItem
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    // 写入redis, 超时时回复
                    sendRtpItem.setStatus(1);
                    sendRtpItem.setCallId(callIdHeader.getCallId());
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        if (!platform.isStartOfflinePush()) {
            // 平台设置中关闭了拉起离线的推流则直接回复
            try {
                logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            return;
        }
        // 发送redis消息以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
                    sendRtpItem.setFromTag(request.getFromTag());
                    SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
                    if (response != null) {
                        sendRtpItem.setToTag(response.getToTag());
                    }
                    redisCatchStorage.updateSendRTPSever(sendRtpItem);
                }, (wvpResult) -> {
        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
            try {
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        // 写入redis待发流信息,供其他wvp读取并生成发流信息
        SendRtpItem sendRtpItemTemp = new SendRtpItem();
        sendRtpItemTemp.setIp(addressStr);
        sendRtpItemTemp.setPort(port);
        sendRtpItemTemp.setSsrc(ssrc);
        sendRtpItemTemp.setPlatformId(requesterId);
        sendRtpItemTemp.setPlatformName(platform.getName());
        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
        sendRtpItemTemp.setRtcp(platform.isRtcp());
        sendRtpItemTemp.setTcpActive(tcpActive);
        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
            dynamicTask.stop(callIdHeader.getCallId());
            redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                    // 错误
                    if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
                        // 离线
                        // 查询是否在本机上线了
                        StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
                        if (currentStreamPushItem.isPushIng()) {
                            // 在线状态
                            pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        } else {
                            // 不在线 拉起
                            notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                        }
                    }
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (InvalidArgumentException | ParseException | SipException e) {
                        logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
                    } catch (SipException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (InvalidArgumentException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (ParseException e) {
                        logger.error("未处理的异常 ", e);
                    }
                });
                    return;
                }
                sendRtpItemTemp.setLocalPort(localPort);
                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
                // 写入redis, 超时时回复
                sendRtpItemTemp.setStatus(1);
                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
                sendRtpItemTemp.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
                if (response != null) {
                    sendRtpItemTemp.setToTag(response.getToTag());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
            } else {
                // 其他平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(callIdHeader.getCallId());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
                }
            }
        });
    }
    public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
        String sdpIp = mediaServerItem.getSdpIp();
    /**
     * 来自其他wvp的推流
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisCatchStorage.sendStartSendRtp(sendRtpItem);
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
        sendRtpItem.setFromTag(request.getFromTag());
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
        if (response != null) {
            sendRtpItem.setToTag(response.getToTag());
        }
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
    }
    public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
        String sdpIp = sendRtpItem.getLocalIp();
        if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
            sdpIp = platform.getSendStreamIp();
        }
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import java.text.ParseException;
/**
@@ -7,5 +9,5 @@
 */
public interface ChannelOnlineEvent {
    void run(String app, String stream, String serverId) throws ParseException;
    void run(SendRtpItem sendRtpItem) throws ParseException;
}
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@
        messageForPushChannel.setGbId(gbId);
        messageForPushChannel.setApp(app);
        messageForPushChannel.setStream(stream);
        messageForPushChannel.setServerId(serverId);
        messageForPushChannel.setMediaServerId(mediaServerId);
        messageForPushChannel.setPlatFormId(platFormId);
        messageForPushChannel.setPlatFormName(platFormName);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
New file
@@ -0,0 +1,81 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 收到消息后开始给上级发流
 * @author lin
 */
@Component
public class RedisPlatformStartSendRtpListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class);
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
                        if (messageForPushChannel == null
                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
                            continue;
                        }
                        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
                                null);
                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
                        });
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
                    }
                }
            });
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
New file
@@ -0,0 +1,81 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 上级等到设备推流的redis消息
 * @author lin
 */
@Component
public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class);
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
                        if (messageForPushChannel == null
                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
                            continue;
                        }
                        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
                                null);
                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
                        });
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
                    }
                }
            });
        }
    }
}
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -217,4 +217,7 @@
    void sendPushStreamClose(MessageForPushChannel messageForPushChannel);
    void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
    void sendStartSendRtp(SendRtpItem sendRtpItem);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -678,4 +678,16 @@
        logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
    }
    @Override
    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, platformPlayTimeout);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
    }
}