648540858
2024-04-18 55a240bb4504baa9a75e44bc6cc597c96b80705d
临时提交
10个文件已修改
256 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java 100 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application.yml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
public class SendRtpItem {
    /**
@@ -89,7 +91,7 @@
    /**
     *  invite 的 callId
     */
    private String CallId;
    private String callId;
    /**
     *  invite 的 fromTag
@@ -242,11 +244,11 @@
    }
    public String getCallId() {
        return CallId;
        return callId;
    }
    public void setCallId(String callId) {
        CallId = callId;
        this.callId = callId;
    }
    public InviteStreamType getPlayType() {
@@ -364,7 +366,7 @@
                ", localPort=" + localPort +
                ", mediaServerId='" + mediaServerId + '\'' +
                ", serverId='" + serverId + '\'' +
                ", CallId='" + CallId + '\'' +
                ", CallId='" + callId + '\'' +
                ", fromTag='" + fromTag + '\'' +
                ", toTag='" + toTag + '\'' +
                ", pt=" + pt +
@@ -376,4 +378,15 @@
                ", sessionName='" + sessionName + '\'' +
                '}';
    }
    public String getRedisKey() {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                serverId + "_"
                + mediaServerId + "_"
                + platformId + "_"
                + channelId + "_"
                + stream + "_"
                + callId;
        return key;
    }
}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -96,7 +96,7 @@
        logger.info("[收到ACK]: 来自->{}", fromUserId);
        SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
        if (sendRtpItem == null) {
            logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
            logger.warn("[收到ACK]:未找到来自{},callId: {}", fromUserId, callIdHeader.getCallId());
            return;
        }
        // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤
@@ -117,7 +117,7 @@
        if (parentPlatform != null) {
            Map<String, Object> param = getSendRtpParam(sendRtpItem);
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem);
                WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
                if (wvpResult.getCode() == 0) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                            sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -137,7 +137,7 @@
                if (platform != null) {
                    redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
                    if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                        redisRpcService.stopSendRtp(sendRtpItem);
                        redisRpcService.stopSendRtp(sendRtpItem.getRedisKey());
                        redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null);
                    }else {
                        MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -44,6 +44,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sdp.*;
@@ -84,6 +85,9 @@
    @Autowired
    private IRedisRpcService redisRpcService;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    @Autowired
    private SSRCFactory ssrcFactory;
@@ -604,6 +608,7 @@
                                StreamPushItem transform = streamPushService.transform(pushListItem);
                                transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                redisCatchStorage.updateSendRTPSever(sendRtpItem);
                                // 开始推流
                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                            }else {
@@ -766,7 +771,7 @@
        redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
        // 设置超时
        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
            redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
            try {
                responseAck(request, Response.REQUEST_TIMEOUT); // 超时
@@ -775,8 +780,27 @@
            }
        }, userSetting.getPlatformPlayTimeout());
        //
        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
            dynamicTask.stop(sendRtpItem.getCallId());
            if (sendRtpItemKey == null) {
                logger.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
            if (sendRtpItemFromRedis == null) {
                logger.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                try {
                    responseAck(request, Response.BUSY_HERE);
                } catch (SipException | InvalidArgumentException | ParseException e) {
                    logger.error("未处理的异常 ", e);
                }
                return;
            }
            if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
                logger.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -784,11 +808,7 @@
                    logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
                    try {
                        responseAck(request, Response.BUSY_HERE);
                    } catch (SipException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (InvalidArgumentException e) {
                        logger.error("未处理的异常 ", e);
                    } catch (ParseException e) {
                    } catch (SipException | InvalidArgumentException | ParseException e) {
                        logger.error("未处理的异常 ", e);
                    }
                    return;
@@ -814,7 +834,7 @@
        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
            if (response.getCode() != 0) {
                dynamicTask.stop(sendRtpItem.getCallId());
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
                redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
                try {
                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -831,7 +851,10 @@
     */
    private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
        logger.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
        if (sendRtpItem == null) {
            return;
        }
        // 写入redis, 超时时回复
        sendRtpItem.setStatus(1);
        SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -539,7 +539,7 @@
                                        }
                                    }else {
                                        // 通知其他wvp停止发流
                                        redisRpcService.rtpSendStopped(sendRtpItem);
                                        redisRpcService.rtpSendStopped(sendRtpItem.getRedisKey());
                                    }
                                } catch (SipException | InvalidArgumentException | ParseException |
                                         SsrcTransactionNotFoundException e) {
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
@@ -6,16 +6,16 @@
public interface IRedisRpcService {
    SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem);
    SendRtpItem getSendRtpItem(String sendRtpItemKey);
    WVPResult startSendRtp(SendRtpItem sendRtpItem);
    WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
    WVPResult stopSendRtp(String sendRtpItemKey);
    WVPResult stopSendRtp(SendRtpItem sendRtpItem);
    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
    void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
    void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem);
    void rtpSendStopped(SendRtpItem sendRtpItem);
    void rtpSendStopped(String sendRtpItemKey);
}
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
@@ -21,6 +20,7 @@
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +78,14 @@
     * 获取发流的信息
     */
    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            return response;
        }
        logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        // 查询本级是否有这个流
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -103,9 +110,10 @@
            sendRtpItem.setSsrc(ssrc);
        }
        redisCatchStorage.updateSendRTPSever(sendRtpItem);
        redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        response.setBody(sendRtpItem);
        response.setBody(sendRtpItemKey);
        return response;
    }
@@ -113,14 +121,25 @@
     * 监听流上线
     */
    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        // 查询本级是否有这个流
        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
        if (mediaServerItem != null) {
            logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
            if (sendRtpItem.getSsrc() == null) {
                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                sendRtpItem.setSsrc(ssrc);
            }
            sendRtpItem.setMediaServerId(mediaServerItem.getId());
            sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem);
            response.setBody(sendRtpItem.getRedisKey());
            response.setStatusCode(200);
        }
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
@@ -139,8 +158,9 @@
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            RedisRpcResponse response = request.getResponse();
            response.setBody(sendRtpItem);
            response.setBody(sendRtpItem.getRedisKey());
            response.setStatusCode(200);
            // 手动发送结果
            sendResponse(response);
@@ -153,7 +173,14 @@
     * 停止监听流上线
     */
    public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            return response;
        }
        logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
@@ -168,24 +195,33 @@
     * 开始发流
     */
    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
            response.setBody(wvpResult);
            return response;
        }
        logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
            response.setBody(wvpResult);
            return response;
        }
        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
        if (!streamReady) {
            logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
            response.setBody(wvpResult);
            return response;
        }
        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
            WVPResult wvpResult = WVPResult.success();
@@ -202,43 +238,51 @@
     * 停止发流
     */
    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
            response.setBody(wvpResult);
            return response;
        }
        logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
            RedisRpcResponse response = request.getResponse();
            response.setStatusCode(200);
            WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
            response.setBody(wvpResult);
            return response;
        }
        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (jsonObject.getInteger("code") == 0) {
            logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
            WVPResult wvpResult = WVPResult.success();
            response.setBody(wvpResult);
            response.setBody(WVPResult.success());
            return response;
        }else {
            int code = jsonObject.getInteger("code");
            String msg = jsonObject.getString("msg");
            logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
            WVPResult wvpResult = WVPResult.fail(code, msg);
            response.setBody(wvpResult);
            response.setBody(WVPResult.fail(code, msg));
            return response;
        }
        return response;
    }
    /**
     * 其他wvp通知推流已经停止了
     */
    public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
        logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId());
        String sendRtpItemKey = request.getParam().toString();
        SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
        RedisRpcResponse response = request.getResponse();
        response.setStatusCode(200);
        if (sendRtpItemInCatch == null) {
        if (sendRtpItem == null) {
            logger.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            return response;
        }
        logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
        String platformId = sendRtpItem.getPlatformId();
        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
        if (platform == null) {
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
@@ -14,10 +14,12 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
@@ -37,6 +39,9 @@
    @Autowired
    private SSRCFactory ssrcFactory;
    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;
    private RedisRpcRequest buildRequest(String uri, Object param) {
        RedisRpcRequest request = new RedisRpcRequest();
        request.setFromId(userSetting.getServerId());
@@ -46,32 +51,40 @@
    }
    @Override
    public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem);
    public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
        if (response.getBody() == null) {
            return null;
        }
        return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
    }
    @Override
    public WVPResult startSendRtp(SendRtpItem sendRtpItem) {
    public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
        logger.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem);
        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public WVPResult stopSendRtp(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem);
    public WVPResult stopSendRtp(String sendRtpItemKey) {
        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
        }
        logger.info("[请求其他WVP] 停止推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        RedisRpcResponse response = redisRpcConfig.request(request, 10);
        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
    }
    @Override
    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) {
    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
        logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
@@ -87,36 +100,47 @@
            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
            sendRtpItem.setServerId(userSetting.getServerId());
            redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
            if (callback != null) {
                callback.run(sendRtpItem);
                callback.run(sendRtpItem.getRedisKey());
            }
            hookSubscribe.removeSubscribe(hook);
        });
        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, response -> {
            SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
            logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItemFromOther.getApp(), sendRtpItemFromOther.getStream(), sendRtpItemFromOther);
            if (response.getBody() == null) {
                logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
                return;
            }
            logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
            if (callback != null) {
                callback.run(sendRtpItemFromOther);
                callback.run(response.getBody().toString());
            }
        });
    }
    @Override
    public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
    public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) {
        logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey);
        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
        hookSubscribe.removeSubscribe(hook);
        RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
        RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, 10);
    }
    @Override
    public void rtpSendStopped(SendRtpItem sendRtpItem) {
        RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem);
    public void rtpSendStopped(String sendRtpItemKey) {
        SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
        if (sendRtpItem == null) {
            logger.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey);
            return;
        }
        RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);
        request.setToId(sendRtpItem.getServerId());
        redisRpcConfig.request(request, 10);
    }
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -141,15 +141,7 @@
    @Override
    public void updateSendRTPSever(SendRtpItem sendRtpItem) {
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
                userSetting.getServerId() + "_"
                + sendRtpItem.getMediaServerId() + "_"
                + sendRtpItem.getPlatformId() + "_"
                + sendRtpItem.getChannelId() + "_"
                + sendRtpItem.getStream() + "_"
                + sendRtpItem.getCallId();
        redisTemplate.opsForValue().set(key, sendRtpItem);
        redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
    }
    @Override
@@ -186,7 +178,7 @@
            callId = "*";
        }
        String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
                + userSetting.getServerId() + "_*_"
                + "*_*_"
                + platformGbId + "_"
                + channelId + "_"
                + streamId + "_"
@@ -292,7 +284,7 @@
     */
    @Override
    public void deleteSendRTPServer(SendRtpItem sendRtpItem) {
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getServerId());
        deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream());
    }
    @Override
src/main/resources/application.yml
@@ -2,4 +2,4 @@
  application:
    name: wvp
  profiles:
    active: local
    active: local2