648540858
2024-04-16 b4168c02cba462571dd3f5bdc1d0b1ffddbc938a
优化多wvp国标级联推流
14个文件已修改
1个文件已添加
2个文件已删除
743 ■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 188 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java 138 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java 77 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -74,6 +74,7 @@
    public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
    public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
    public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
    public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -29,9 +29,6 @@
    private RedisAlarmMsgListener redisAlarmMsgListener;
    @Autowired
    private RedisStreamMsgListener redisStreamMsgListener;
    @Autowired
    private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
    @Autowired
@@ -52,6 +49,9 @@
    @Autowired
    private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
    @Autowired
    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
    /**
     * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -67,14 +67,14 @@
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
        container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
        container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
        container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
        container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
        container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -132,6 +132,11 @@
     */
    private String receiveStream;
    /**
     * 上级的点播类型
     */
    private String sessionName;
    public String getIp() {
        return ip;
    }
@@ -332,6 +337,14 @@
        this.localIp = localIp;
    }
    public String getSessionName() {
        return sessionName;
    }
    public void setSessionName(String sessionName) {
        this.sessionName = sessionName;
    }
    @Override
    public String toString() {
        return "SendRtpItem{" +
@@ -347,7 +360,7 @@
                ", stream='" + stream + '\'' +
                ", tcp=" + tcp +
                ", tcpActive=" + tcpActive +
                ", localIp=" + localIp +
                ", localIp='" + localIp + '\'' +
                ", localPort=" + localPort +
                ", mediaServerId='" + mediaServerId + '\'' +
                ", serverId='" + serverId + '\'' +
@@ -360,6 +373,7 @@
                ", rtcp=" + rtcp +
                ", playType=" + playType +
                ", receiveStream='" + receiveStream + '\'' +
                ", sessionName='" + sessionName + '\'' +
                '}';
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -16,7 +16,6 @@
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
@@ -78,9 +77,6 @@
    private DynamicTask dynamicTask;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private IPlayService playService;
@@ -117,13 +113,7 @@
        if (parentPlatform != null) {
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                        sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                        sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                        sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
                redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                    playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
                });
                redisCatchStorage.sendStartSendRtp(sendRtpItem);
            } else {
                JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
                if (startSendRtpStreamResult != null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -19,7 +19,6 @@
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -98,8 +97,6 @@
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -142,7 +139,7 @@
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
//                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
                    }
                }else {
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -122,7 +122,7 @@
    private UserSetting userSetting;
    @Autowired
    private ZLMMediaListManager mediaListManager;
    private RedisPlatformPushStreamOnlineLister mediaListManager;
    @Autowired
    private SipConfig config;
@@ -568,19 +568,16 @@
                    }
                } else if (gbStream != null) {
                    String ssrc;
                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    }else {
                        ssrc = gb28181Sdp.getSsrc();
                    }
                    SendRtpItem sendRtpItem = new SendRtpItem();
                    sendRtpItem.setTcpActive(tcpActive);
                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
                    }
                    if (tcpActive != null) {
                        sendRtpItem.setTcpActive(tcpActive);
                    }
                    sendRtpItem.setTcp(mediaTransmissionTCP);
                    sendRtpItem.setRtcp(platform.isRtcp());
                    sendRtpItem.setSsrc(ssrc);
                    sendRtpItem.setPlatformName(platform.getName());
                    sendRtpItem.setPlatformId(platform.getServerGBId());
                    sendRtpItem.setMediaServerId(mediaServerItem.getId());
@@ -593,37 +590,48 @@
                    sendRtpItem.setCallId(callIdHeader.getCallId());
                    sendRtpItem.setFromTag(request.getFromTag());
                    sendRtpItem.setOnlyAudio(false);
                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                    sendRtpItem.setStatus(0);
                    sendRtpItem.setSessionName(sessionName);
                    if ("push".equals(gbStream.getStreamType())) {
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            sendRtpItem.setServerId(pushListItem.getSeverId());
                            if (pushListItem != null) {
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                pushStream(sendRtpItem, mediaServerItem, platform, request);
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                // 未推流 拉起
                                if (!platform.isStartOfflinePush()) {
                                    // 平台设置中关闭了拉起离线的推流则直接回复
                                    try {
                                        logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
                                    } catch (SipException | InvalidArgumentException | ParseException e) {
                                        logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
                                    }
                                    return;
                                }
                                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    } else if ("proxy".equals(gbStream.getStreamType())) {
                        if (null != proxyByAppAndStream) {
                            if (sendRtpItem.getSsrc() == null) {
                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                                sendRtpItem.setSsrc(ssrc);
                            }
                            if (proxyByAppAndStream.isStatus()) {
                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                            } else {
                                //开启代理拉流
                                notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                            }
                        }
                    }
                }
            }
@@ -649,33 +657,23 @@
    /**
     * 安排推流
     */
    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                            String channelId, String addressStr, String ssrc, String requesterId) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
            if (streamReady != null && streamReady) {
                // 自平台内容
                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
            if (sendRtpItem == null) {
                logger.warn("服务器端口资源不足");
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
                if (localPort == 0) {
                    logger.warn("服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
                    }
                    return;
                }
                return;
            }
            if (tcpActive != null) {
                sendRtpItem.setTcpActive(tcpActive);
            }
            sendRtpItem.setPlayType(InviteStreamType.PUSH);
            sendRtpItem.setPlayType(InviteStreamType.PROXY);
            // 写入redis, 超时时回复
            sendRtpItem.setStatus(1);
            sendRtpItem.setCallId(callIdHeader.getCallId());
            sendRtpItem.setFromTag(request.getFromTag());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
@@ -683,12 +681,10 @@
                sendRtpItem.setToTag(response.getToTag());
            }
            redisCatchStorage.updateSendRTPSever(sendRtpItem);
        }
    }
    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // 推流
        if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -712,6 +708,11 @@
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                if (sendRtpItem.getSsrc() == null) {
                    // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                    sendRtpItem.setSsrc(ssrc);
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
@@ -719,10 +720,6 @@
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
            }
        } else {
            SendRtpItem sendRtpItem = new SendRtpItem();
            sendRtpItem.setRtcp(platform.isRtcp());
            sendRtpItem.setTcp(mediaTransmissionTCP);
            sendRtpItem.setTcpActive();
            // 其他平台内容
            otherWvpPushStream(sendRtpItem, request, platform);
        }
@@ -733,29 +730,28 @@
     */
    private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        // TODO 控制启用以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        // 监听流上线
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
        zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
            OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
            logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
            dynamicTask.stop(callIdHeader.getCallId());
            pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
            dynamicTask.stop(sendRtpItem.getCallId());
            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
        });
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
        }, userSetting.getPlatformPlayTimeout());
        boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!start) {
            try {
                responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
            dynamicTask.stop(callIdHeader.getCallId());
            dynamicTask.stop(sendRtpItem.getCallId());
        }
    }
@@ -763,50 +759,28 @@
     * 通知流上线
     */
    private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
        if (!platform.isStartOfflinePush()) {
            // 平台设置中关闭了拉起离线的推流则直接回复
            try {
                logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
                responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
            }
            return;
        }
        // 发送redis消息以使设备上线
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
        // 发送redis消息以使设备上线,流上线后被
        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
        MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        // 写入redis待发流信息,供其他wvp读取并生成发流信息
        SendRtpItem sendRtpItemTemp = new SendRtpItem();
        sendRtpItemTemp.setIp(addressStr);
        sendRtpItemTemp.setPort(port);
        sendRtpItemTemp.setSsrc(ssrc);
        sendRtpItemTemp.setPlatformId(requesterId);
        sendRtpItemTemp.setPlatformName(platform.getName());
        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
        sendRtpItemTemp.setRtcp(platform.isRtcp());
        sendRtpItemTemp.setTcpActive(tcpActive);
        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
            dynamicTask.stop(callIdHeader.getCallId());
            redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -823,18 +797,18 @@
                    }
                    return;
                }
                sendRtpItemTemp.setLocalPort(localPort);
                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
                // 写入redis, 超时时回复
                sendRtpItemTemp.setStatus(1);
                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
                sendRtpItemTemp.setFromTag(request.getFromTag());
                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
                if (response != null) {
                    sendRtpItemTemp.setToTag(response.getToTag());
                sendRtpItem.setLocalPort(localPort);
                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // 其他平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
@@ -842,10 +816,10 @@
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(callIdHeader.getCallId());
                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                dynamicTask.stop(sendRtpItem.getCallId());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -18,14 +18,12 @@
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -103,7 +101,7 @@
    private EventPublisher eventPublisher;
    @Autowired
    private ZLMMediaListManager zlmMediaListManager;
    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
@@ -129,6 +127,9 @@
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private IStreamPushService streamPushService;
    /**
     * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
@@ -236,10 +237,7 @@
                // 鉴权通过
                redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
            }
        } else {
            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
        }
        HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
        result.setEnable_audio(true);
@@ -465,8 +463,7 @@
                                    || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                    || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                param.setSeverId(userSetting.getServerId());
                                zlmMediaListManager.addPush(param);
                                streamPushService.updatePush(param);
                                // 冗余数据,自己系统中自用
                                redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                            }
@@ -483,10 +480,13 @@
                                }
                            }
                            GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                            if (gbStream != null) {
//                                    eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
                            // 查找是否关联了国标, 关联了不删除, 置为离线
                            if (gbStream == null) {
                                storager.removeMedia(param.getApp(), param.getStream());
                            }else {
//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
                                storager.mediaOffline(param.getApp(), param.getStream());
                            }
                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                        }
                        GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                        if (gbStream != null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -118,4 +118,5 @@
    Map<String, StreamPushItem> getAllAppAndStreamMap();
    void updatePush(OnStreamChangedHookParam param);
}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -31,7 +31,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
@@ -133,9 +132,6 @@
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Autowired
    private RedisGbPlayMsgListener redisGbPlayMsgListener;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
@@ -1366,15 +1362,7 @@
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
        }
        if (mediaInfo == null) {
            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
            });
        } else {
        if (mediaInfo != null) {
            // 如果是严格模式,需要关闭端口占用
            JSONObject startSendRtpStreamResult = null;
            if (sendRtpItem.getLocalPort() != 0) {
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -553,4 +553,21 @@
    public Map<String, StreamPushItem> getAllAppAndStreamMap() {
        return streamPushMapper.getAllAppAndStreamMap();
    }
    @Override
    public void updatePush(OnStreamChangedHookParam param) {
        StreamPushItem transform = transform(param);
        StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
        transform.setPushIng(param.isRegist());
        transform.setUpdateTime(DateUtil.getNow());
        transform.setPushTime(DateUtil.getNow());
        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
        if (pushInDb == null) {
            transform.setCreateTime(DateUtil.getNow());
            streamPushMapper.add(transform);
        }else {
            streamPushMapper.update(transform);
            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
        }
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
New file
@@ -0,0 +1,97 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * @author lin
 */
@Component
public class RedisPlatformPushStreamOnlineLister implements MessageListener {
    private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister");
    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    /**
     * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
                    sendStreamEvent(sendRtpItem);
                }
            });
        }
    }
    private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
    public void sendStreamEvent(SendRtpItem sendRtpItem) {
        // 查看推流状态
        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (channelOnlineEventLister != null)  {
            try {
                channelOnlineEventLister.run(sendRtpItem);
            } catch (ParseException e) {
                logger.error("sendStreamEvent: ", e);
            }
            removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
        }
    }
    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
        this.channelOnPublishEvents.put(app + "_" + stream, callback);
    }
    public void removedChannelOnlineEventLister(String app, String stream) {
        this.channelOnPublishEvents.remove(app + "_" + stream);
    }
    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
        return this.channelOnPublishEvents.get(app + "_" + stream);
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
@@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,6 +22,8 @@
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -32,10 +38,10 @@
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Autowired
    private UserSetting userSetting;
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    private IMediaServerService mediaServerService;
    @Qualifier("taskExecutor")
    @Autowired
@@ -52,23 +58,14 @@
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
                        if (messageForPushChannel == null
                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
                            continue;
                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
                        sendRtpItem.getMediaServerId();
                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                        if (mediaServer == null) {
                            return;
                        }
                        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
                                null);
                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
                        });
                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
                    }catch (Exception e) {
                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
@@ -78,4 +75,48 @@
            });
        }
    }
    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>(12);
        param.put("vhost","__defaultVhost__");
        param.put("app",sendRtpItem.getApp());
        param.put("stream",sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        param.put("dst_url",sendRtpItem.getIp());
        param.put("dst_port", sendRtpItem.getPort());
        param.put("src_port", sendRtpItem.getLocalPort());
        param.put("pt", sendRtpItem.getPt());
        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
        param.put("is_udp", isUdp);
        if (!sendRtpItem.isTcp()) {
            // udp模式下开启rtcp保活
            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
        }
        return param;
    }
    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
        JSONObject startSendRtpStreamResult = null;
        if (sendRtpItem.getLocalPort() != 0) {
            if (sendRtpItem.isTcpActive()) {
                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
            }else {
                param.put("dst_url", sendRtpItem.getIp());
                param.put("dst_port", sendRtpItem.getPort());
                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
            }
        }else {
            if (sendRtpItem.isTcpActive()) {
                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
            }else {
                param.put("dst_url", sendRtpItem.getIp());
                param.put("dst_port", sendRtpItem.getPort());
                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
            }
        }
        return startSendRtpStreamResult;
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
@@ -2,12 +2,15 @@
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,13 +38,25 @@
    private UserSetting userSetting;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    /**
     * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
@@ -66,7 +81,17 @@
                                null);
                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
                            if (sendRtpItem.getSsrc() == null) {
                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                                sendRtpItem.setSsrc(ssrc);
                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
                                // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。
                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
                            }
                        });
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -219,5 +219,9 @@
    void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
    SendRtpItem getWaiteSendRtpItem(String app, String stream);
    void sendStartSendRtp(SendRtpItem sendRtpItem);
    void sendPushStreamOnline(SendRtpItem sendRtpItem);
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -686,8 +686,21 @@
    }
    @Override
    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
        return (SendRtpItem)redisTemplate.opsForValue().get(key);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
    }
    @Override
    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
        logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
}