648540858
2024-05-29 764d04b497356ba6bcbb75fd42b51eca750f7223
调整上级观看消息的发送
11个文件已修改
105 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -13,7 +13,6 @@
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -108,19 +107,16 @@
            if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
                WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem);
                if (wvpResult.getCode() == 0) {
                    MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                            sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
                            sendRtpItem.getMediaServerId());
                    messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
                    redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
                    redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform);
                }
            } else {
                try {
                    if (sendRtpItem.isTcpActive()) {
                        mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
                        mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null);
                    } else {
                        mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
                        mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
                    }
                    redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, parentPlatform);
                }catch (ControllerException e) {
                    logger.error("RTP推流失败: {}", e.getMessage());
                    playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
@@ -142,9 +138,9 @@
            }
            try {
                if (sendRtpItem.isTcpActive()) {
                    mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
                    mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
                } else {
                    mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
                    mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
                }
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -466,7 +466,8 @@
                            if (sendRtpItem.isTcpActive()) {
                                MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
                                try {
                                    mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
                                    mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5);
                                    redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
                                }catch (ControllerException e) {}
                            }
                        } catch (SipException | InvalidArgumentException | ParseException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
@@ -152,7 +152,7 @@
                                }else {
                                    // 发流
                                    try {
                                        mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
                                        mediaServerService.startSendRtp(hookData.getMediaServer(), sendRtpItem);
                                    }catch (ControllerException e) {
                                        logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
                                        return;
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
@@ -2,7 +2,6 @@
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -141,9 +140,9 @@
    Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId);
    void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
    void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout);
    void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
    void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem);
    SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -7,8 +7,6 @@
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
@@ -24,7 +22,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
@@ -827,18 +824,17 @@
    }
    @Override
    public void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout) {
    public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
        }
        mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
        sendPlatformStartPlayMsg(platform, sendRtpItem);
    }
    @Override
    public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
    public void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
@@ -847,21 +843,6 @@
        logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
                sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
        mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
        if (platform != null) {
            sendPlatformStartPlayMsg(platform, sendRtpItem);
        }
    }
    private void sendPlatformStartPlayMsg(ParentPlatform platform, SendRtpItem sendRtpItem) {
        if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform  != null) {
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                    sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(platform.getId());
            redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -383,7 +383,7 @@
        }, userSetting.getPlayTimeout());
        try {
            mediaServerService.startSendRtpPassive(mediaServerItem, null, sendRtpItem, userSetting.getPlayTimeout() * 1000);
            mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000);
        }catch (ControllerException e) {
            mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
            logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
@@ -1412,10 +1412,11 @@
        if (mediaInfo != null) {
            try {
                if (sendRtpItem.isTcpActive()) {
                    mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
                    mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null);
                } else {
                    mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
                    mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
                }
                redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform);
            }catch (ControllerException e) {
                logger.error("RTP推流失败: {}", e.getMessage());
                startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -209,7 +209,7 @@
            return response;
        }
        try {
            mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
            mediaServerService.startSendRtp(mediaServer, sendRtpItem);
        }catch (ControllerException exception) {
            logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
            WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -208,7 +208,7 @@
    void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
    void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
    void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
    void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -656,10 +656,16 @@
    }
    @Override
    public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
    public void sendPlatformStartPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
        if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && platform  != null) {
            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
                    sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
                    sendRtpItem.getMediaServerId());
            messageForPushChannel.setPlatFormIndex(platform.getId());
        String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
        logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
        redisTemplate.convertAndSend(key, JSON.toJSON(msg));
            logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
            redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
        }
    }
    @Override
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
@@ -1,19 +1,17 @@
package com.genersoft.iot.vmp.vmanager.ps;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -210,7 +208,7 @@
        SendRtpItem sendRtpItem = SendRtpItem.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null);
        Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
        if (streamReady) {
            mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
            mediaServerService.startSendRtp(mediaServer, sendRtpItem);
            logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItem);
            redisTemplate.opsForValue().set(key, sendInfo);
        }else {
@@ -235,7 +233,7 @@
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        mediaServerService.startSendRtp(mediaServer, null,  sendRtpItem);
                        mediaServerService.startSendRtp(mediaServer, sendRtpItem);
                        logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItem);
                        redisTemplate.opsForValue().set(key, finalSendInfo);
                        hookSubscribe.removeSubscribe(hook);
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -1,18 +1,17 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -31,9 +30,7 @@
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -247,12 +244,12 @@
        Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
        if (streamReady) {
            if (sendRtpItemForVideo != null) {
                mediaServerService.startSendRtp(mediaServer, null,  sendRtpItemForVideo);
                mediaServerService.startSendRtp(mediaServer,  sendRtpItemForVideo);
                logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItemForVideo);
                redisTemplate.opsForValue().set(key, sendInfo);
            }
            if(sendRtpItemForAudio != null) {
                mediaServerService.startSendRtp(mediaServer, null,  sendRtpItemForAudio);
                mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio);
                logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, sendRtpItemForAudio);
                redisTemplate.opsForValue().set(key, sendInfo);
            }
@@ -279,12 +276,12 @@
                            throw new RuntimeException(e);
                        }
                        if (sendRtpItemForVideo != null) {
                            mediaServerService.startSendRtp(mediaServer, null,  sendRtpItemForVideo);
                            mediaServerService.startSendRtp(mediaServer, sendRtpItemForVideo);
                            logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, sendRtpItemForVideo);
                            redisTemplate.opsForValue().set(key, finalSendInfo);
                        }
                        if(sendRtpItemForAudio != null) {
                            mediaServerService.startSendRtp(mediaServer, null,  sendRtpItemForAudio);
                            mediaServerService.startSendRtp(mediaServer, sendRtpItemForAudio);
                            logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, sendRtpItemForAudio);
                            redisTemplate.opsForValue().set(key, finalSendInfo);
                        }