648540858
2024-04-16 9c6765d44ef2ccb06fdaf525a06e564a331ab892
重构多wvp国标级联机制
9个文件已修改
7个文件已添加
5个文件已删除
1364 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java 205 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java 122 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java 203 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -34,23 +34,13 @@
    @Autowired
    private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
    @Autowired
    private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
    @Autowired
    private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
    @Autowired
    private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
    @Autowired
    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
    private RedisRpcConfig redisRpcConfig;
    /**
@@ -69,12 +59,8 @@
        container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
        container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
        container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
        container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
        container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
        container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
        container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
        container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
        container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
        container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
        return container;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
New file
@@ -0,0 +1,205 @@
package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
@Component
public class RedisRpcConfig implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
    private final Random random = new Random();
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private RedisRpcController redisRpcController;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
    @Qualifier("taskExecutor")
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        boolean isEmpty = taskQueue.isEmpty();
        taskQueue.offer(message);
        if (isEmpty) {
            taskExecutor.execute(() -> {
                while (!taskQueue.isEmpty()) {
                    Message msg = taskQueue.poll();
                    try {
                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
                        if (redisRpcMessage.getRequest() != null) {
                            handlerRequest(redisRpcMessage.getRequest());
                        } else if (redisRpcMessage.getResponse() != null){
                            handlerResponse(redisRpcMessage.getResponse());
                        } else {
                            logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage));
                        }
                    } catch (Exception e) {
                        logger.error("[redis rpc 解析异常] ", e);
                    }
                }
            });
        }
    }
    private void handlerResponse(RedisRpcResponse response) {
        if (userSetting.getServerId().equals(response.getToId())) {
            return;
        }
        response(response);
    }
    private void handlerRequest(RedisRpcRequest request) {
        try {
            if (userSetting.getServerId().equals(request.getFromId())) {
                return;
            }
            Method method = getMethod(request.getUri());
            // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404
            if (userSetting.getServerId().equals(request.getToId())) {
                if (method == null) {
                    // 回复404结果
                    RedisRpcResponse response = request.getResponse();
                    response.setStatusCode(404);
                    sendResponse(response);
                    return;
                }
                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
                if(response != null) {
                    sendResponse(response);
                }
            }else {
                if (method == null) {
                    return;
                }
                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
                if (response != null) {
                    sendResponse(response);
                }
            }
        }catch (InvocationTargetException | IllegalAccessException e) {
            logger.error("[redis rpc ] 处理请求失败 ", e);
        }
    }
    private Method getMethod(String name) {
        // 启动后扫描所有的路径注解
        Method[] methods = redisRpcController.getClass().getMethods();
        for (Method method : methods) {
            if (method.getName().equals(name)) {
                return method;
            }
        }
        return null;
    }
    private void sendResponse(RedisRpcResponse response){
        response.setToId(userSetting.getServerId());
        RedisRpcMessage message = new RedisRpcMessage();
        message.setResponse(response);
        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
    }
    private void sendRequest(RedisRpcRequest request){
        RedisRpcMessage message = new RedisRpcMessage();
        message.setRequest(request);
        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
    }
    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
        request.setSn((long) random.nextInt(1000) + 1);
        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
        try {
            sendRequest(request);
            return subscribe.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
        } finally {
            this.unsubscribe(request.getSn());
        }
        return null;
    }
    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
        request.setSn((long) random.nextInt(1000) + 1);
        setCallback(request.getSn(), callback);
        sendRequest(request);
    }
    public Boolean response(RedisRpcResponse response) {
        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
        if (queue != null) {
            try {
                return queue.offer(response, 2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.error("{}", e.getMessage(), e);
            }
        }else if (callback != null) {
            callback.run(response);
            callbacks.remove(response.getSn());
        }
        return false;
    }
    private void unsubscribe(long key) {
        topicSubscribers.remove(key);
    }
    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
        SynchronousQueue<RedisRpcResponse> queue = null;
        if (!topicSubscribers.containsKey(key))
            topicSubscribers.put(key, queue = new SynchronousQueue<>());
        return queue;
    }
    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
        if (!callbacks.containsKey(key)) {
            callbacks.put(key, callback);
        }
    }
    public void removeCallback(long key)  {
        callbacks.remove(key);
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
New file
@@ -0,0 +1,24 @@
package com.genersoft.iot.vmp.conf.redis.bean;
public class RedisRpcMessage {
    private RedisRpcRequest request;
    private RedisRpcResponse response;
    public RedisRpcRequest getRequest() {
        return request;
    }
    public void setRequest(RedisRpcRequest request) {
        this.request = request;
    }
    public RedisRpcResponse getResponse() {
        return response;
    }
    public void setResponse(RedisRpcResponse response) {
        this.response = response;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
New file
@@ -0,0 +1,93 @@
package com.genersoft.iot.vmp.conf.redis.bean;
/**
 * 通过redis发送请求
 */
public class RedisRpcRequest {
    /**
     * 来自的WVP ID
     */
    private String fromId;
    /**
     * 目标的WVP ID
     */
    private String toId;
    /**
     * 序列号
     */
    private long sn;
    /**
     * 访问的路径
     */
    private String uri;
    /**
     * 参数
     */
    private Object param;
    public String getFromId() {
        return fromId;
    }
    public void setFromId(String fromId) {
        this.fromId = fromId;
    }
    public String getToId() {
        return toId;
    }
    public void setToId(String toId) {
        this.toId = toId;
    }
    public String getUri() {
        return uri;
    }
    public void setUri(String uri) {
        this.uri = uri;
    }
    public Object getParam() {
        return param;
    }
    public void setParam(Object param) {
        this.param = param;
    }
    public long getSn() {
        return sn;
    }
    public void setSn(long sn) {
        this.sn = sn;
    }
    @Override
    public String toString() {
        return "RedisRpcRequest{" +
                "fromId='" + fromId + '\'' +
                ", toId='" + toId + '\'' +
                ", sn='" + sn + '\'' +
                ", uri='" + uri + '\'' +
                ", param=" + param +
                '}';
    }
    public RedisRpcResponse getResponse() {
        RedisRpcResponse response = new RedisRpcResponse();
        response.setFromId(fromId);
        response.setToId(toId);
        response.setSn(sn);
        response.setUri(uri);
        return response;
    }
}
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
New file
@@ -0,0 +1,87 @@
package com.genersoft.iot.vmp.conf.redis.bean;
/**
 * 通过redis发送回复
 */
public class RedisRpcResponse {
    /**
     * 来自的WVP ID
     */
    private String fromId;
    /**
     * 目标的WVP ID
     */
    private String toId;
    /**
     * 序列号
     */
    private long sn;
    /**
     * 状态码
     */
    private int statusCode;
    /**
     * 访问的路径
     */
    private String uri;
    /**
     * 参数
     */
    private Object body;
    public String getFromId() {
        return fromId;
    }
    public void setFromId(String fromId) {
        this.fromId = fromId;
    }
    public String getToId() {
        return toId;
    }
    public void setToId(String toId) {
        this.toId = toId;
    }
    public long getSn() {
        return sn;
    }
    public void setSn(long sn) {
        this.sn = sn;
    }
    public int getStatusCode() {
        return statusCode;
    }
    public void setStatusCode(int statusCode) {
        this.statusCode = statusCode;
    }
    public String getUri() {
        return uri;
    }
    public void setUri(String uri) {
        this.uri = uri;
    }
    public Object getBody() {
        return body;
    }
    public void setBody(Object body) {
        this.body = body;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -15,9 +15,11 @@
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -54,6 +56,8 @@
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IRedisRpcService redisRpcService;
    @Autowired
    private UserSetting userSetting;
@@ -113,7 +117,15 @@
        if (parentPlatform != null) {
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                redisCatchStorage.sendStartSendRtp(sendRtpItem);
//                redisCatchStorage.sendStartSendRtp(sendRtpItem);
                WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem);
                if (wvpResult.getCode() == 0) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                            sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
                            sendRtpItem.getMediaServerId());
                    messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                    redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
                }
            } else {
                JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
                if (startSendRtpStreamResult != null) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -18,7 +18,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
@@ -97,6 +97,9 @@
    @Autowired
    private IStreamPushService pushService;
    @Autowired
    private IRedisRpcService redisRpcService;
    @Override
    public void afterPropertiesSet() throws Exception {
@@ -134,13 +137,8 @@
            if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
                // 查询这路流是否是本平台的
                StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
                if (push!= null && !push.isSelf()) {
                    // 不是本平台的就发送redis消息让其他wvp停止发流
                    ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
                    if (platform != null) {
                        RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
//                        redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
                    }
                if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                    redisRpcService.stopSendRtp(sendRtpItem);
                }else {
                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                    redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -29,7 +29,6 @@
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -86,16 +85,13 @@
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    private IRedisRpcService redisRpcService;
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private DynamicTask dynamicTask;
    @Autowired
    private RedisPushStreamResponseListener redisPushStreamResponseListener;
    @Autowired
    private IPlayService playService;
@@ -120,9 +116,6 @@
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private RedisPlatformPushStreamOnlineLister mediaListManager;
    @Autowired
    private SipConfig config;
@@ -594,15 +587,17 @@
                    sendRtpItem.setSessionName(sessionName);
                    if ("push".equals(gbStream.getStreamType())) {
                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                        if (streamPushItem != null) {
                            // 从redis查询是否正在接收这个推流
                            OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
                            if (pushListItem != null) {
                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                // 推流状态
                                // 开始推流
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
                                if (!platform.isStartOfflinePush()) {
@@ -702,8 +697,6 @@
                }
                // 写入redis, 超时时回复
                sendRtpItem.setStatus(1);
                sendRtpItem.setFromTag(request.getFromTag());
                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                if (response != null) {
                    sendRtpItem.setToTag(response.getToTag());
@@ -714,7 +707,6 @@
                    sendRtpItem.setSsrc(ssrc);
                }
                redisCatchStorage.updateSendRTPSever(sendRtpItem);
            } else {
                // 不在线 拉起
                notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
@@ -769,18 +761,14 @@
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
            } catch (SipException | InvalidArgumentException | ParseException e) {
                logger.error("未处理的异常 ", e);
            }
        }, userSetting.getPlatformPlayTimeout());
        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
        // 添加上线的通知
        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
        //
        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -813,19 +801,7 @@
                // 其他平台内容
                otherWvpPushStream(sendRtpItemFromRedis, request, platform);
            }
        });
        // 添加回复的拒绝或者错误的通知
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(sendRtpItem.getCallId());
                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
                }
            }
        });
    }
@@ -836,12 +812,9 @@
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播]直播流来自其他平台,发送redis消息");
        // 发送redis消息
        redisCatchStorage.sendStartSendRtp(sendRtpItem);
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
        sendRtpItem.setFromTag(request.getFromTag());
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
        if (response != null) {
            sendRtpItem.setToTag(response.getToTag());
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -23,7 +23,6 @@
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -99,9 +98,6 @@
    @Autowired
    private EventPublisher eventPublisher;
    @Autowired
    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
    @Autowired
    private ZlmHttpHookSubscribe subscribe;
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -365,4 +365,13 @@
        }
        return result;
    }
    public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
        Map<String, Object> param = new HashMap<>();
        param.put("vhost", "__defaultVhost__");
        param.put("app", sendRtpItem.getApp());
        param.put("stream", sendRtpItem.getStream());
        param.put("ssrc", sendRtpItem.getSsrc());
        return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
    }
}
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
@@ -6,7 +6,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
import java.util.List;
@@ -97,4 +96,5 @@
    List<MediaServerItem> getAllWithAssistPort();
    MediaServerItem getMediaServerByAppAndStream(String app, String stream);
}
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -25,7 +25,6 @@
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@@ -36,19 +35,15 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import java.io.File;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * 媒体服务器节点管理
@@ -751,6 +746,22 @@
    @Override
    public List<MediaServerItem> getAllWithAssistPort() {
        return mediaServerMapper.queryAllWithAssistPort();
    }
    @Override
    public MediaServerItem getMediaServerByAppAndStream(String app, String stream) {
        List<MediaServerItem> mediaServerItemList = getAllOnline();
        if (mediaServerItemList.isEmpty()) {
            return null;
        }
        for (MediaServerItem mediaServerItem : mediaServerItemList) {
            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
            if (streamReady) {
                return mediaServerItem;
            }
        }
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
New file
@@ -0,0 +1,16 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
    SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem);
    WVPResult startSendRtp(SendRtpItem sendRtpItem);
    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
    WVPResult stopSendRtp(SendRtpItem sendRtpItem);
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
File was deleted
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
New file
@@ -0,0 +1,203 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
 * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
 */
@Component
public class RedisRpcController {
    private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private IMediaServerService mediaServerService;
    @Autowired
    private SendRtpPortManager sendRtpPortManager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    /**
     * 获取发流的信息
     */
    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 获取发流的信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
        // 查询本级是否有这个流
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaServerItem == null) {
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        // 自平台内容
        int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
        if (localPort == 0) {
            logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        sendRtpItem.setServerId(userSetting.getServerId());
        sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
        if (sendRtpItem.getSsrc() == null) {
            // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
            String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
            sendRtpItem.setSsrc(ssrc);
        }
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        response.setBody(sendRtpItem);
        return response;
    }
    /**
     * 监听流上线
     */
    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
        // 查询本级是否有这个流
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaServerItem != null) {
            logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem);
            response.setStatusCode(200);
        }
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
            logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem);
            response.setStatusCode(200);
            // 手动发送结果
            sendResponse(response);
        });
        return null;
    }
    /**
     * 开始发流
     */
    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!streamReady) {
            logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (jsonObject.getInteger("code") == 0) {
            WVPResult wvpResult = WVPResult.success();
            response.setBody(wvpResult);
        }else {
            WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
            response.setBody(wvpResult);
        }
        return response;
    }
    /**
     * 停止发流
     */
    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 停止推流: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
        }
        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[redis-rpc] 停止推流成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            WVPResult wvpResult = WVPResult.success();
            response.setBody(wvpResult);
        }else {
            int code = jsonObject.getInteger("code");
            String msg = jsonObject.getString("msg");
            logger.info("[redis-rpc] 停止推流失败: {}/{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg );
            WVPResult wvpResult = WVPResult.fail(code, msg);
            response.setBody(wvpResult);
        }
        return response;
    }
    private void sendResponse(RedisRpcResponse response){
        response.setToId(userSetting.getServerId());
        RedisRpcMessage message = new RedisRpcMessage();
        message.setResponse(response);
        redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
    }
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
New file
@@ -0,0 +1,100 @@
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RedisRpcServiceImpl implements IRedisRpcService {
    @Autowired
    private RedisRpcConfig redisRpcConfig;
    @Autowired
    private UserSetting userSetting;
    @Autowired
    private ZlmHttpHookSubscribe hookSubscribe;
    @Autowired
    private SSRCFactory ssrcFactory;
    private RedisRpcRequest buildRequest(String uri, Object param) {
        RedisRpcRequest request = new RedisRpcRequest();
        request.setFromId(userSetting.getServerId());
        request.setParam(param);
        request.setUri(uri);
        return request;
    }
    @Override
    public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem);
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
    }
    @Override
    public WVPResult startSendRtp(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public WVPResult stopSendRtp(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) {
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            if (callback != null) {
                callback.run(sendRtpItem);
            }
            hookSubscribe.removeSubscribe(hook);
        });
        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, response -> {
            SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
            if (callback != null) {
                callback.run(sendRtpItemFromOther);
            }
        });
    }
}
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -682,19 +682,20 @@
    @Override
    public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, platformPlayTimeout);
        redisTemplate.opsForValue().set(key, sendRtpItem);
    }
    @Override
    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
        return (SendRtpItem)redisTemplate.opsForValue().get(key);
        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
    }
    @Override
    public void sendStartSendRtp(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
        redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
        logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
    }
    @Override