648540858
2024-03-22 150e7a31997f590eba879c3515f21821e9e68eb6
调整节点管理代码结构
14个文件已修改
1137 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java 343 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java 196 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java 189 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java 164 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -84,6 +84,9 @@
    @Schema(description = "是否暂停(录像回放使用)")
    private boolean pause;
    @Schema(description = "产生源类型,包括 unknown = 0,rtmp_push=1,rtsp_push=2,rtp_push=3,pull=4,ffmpeg_pull=5,mp4_vod=6,device_chn=7")
    private int originType;
    public void setFlv(StreamURL flv) {
        this.flv = flv;
    }
@@ -616,4 +619,12 @@
    public void setDownLoadFilePath(DownloadFileInfo downLoadFilePath) {
        this.downLoadFilePath = downLoadFilePath;
    }
    public int getOriginType() {
        return originType;
    }
    public void setOriginType(int originType) {
        this.originType = originType;
    }
}
src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
@@ -28,12 +28,36 @@
    private Integer audioSampleRate;
    @Schema(description = "音频采样率")
    private Long duration;
    @Schema(description = "在线")
    private Boolean online;
    @Schema(description = "unknown = 0,rtmp_push=1,rtsp_push=2,rtp_push=3,pull=4,ffmpeg_pull=5,mp4_vod=6,device_chn=7")
    private Integer originType;
    @Schema(description = "存活时间,单位秒")
    private Long aliveSecond;
    @Schema(description = "数据产生速度,单位byte/s")
    private Long bytesSpeed;
    public static MediaInfo getInstance(JSONObject jsonObject) {
        MediaInfo mediaInfo = new MediaInfo();
        Integer totalReaderCount = jsonObject.getInteger("totalReaderCount");
        Boolean online = jsonObject.getBoolean("online");
        Integer originType = jsonObject.getInteger("originType");
        Long aliveSecond = jsonObject.getLong("aliveSecond");
        Long bytesSpeed = jsonObject.getLong("bytesSpeed");
        if (totalReaderCount != null) {
            mediaInfo.setReaderCount(totalReaderCount);
        }
        if (online != null) {
            mediaInfo.setOnline(online);
        }
        if (originType != null) {
            mediaInfo.setOriginType(originType);
        }
        if (aliveSecond != null) {
            mediaInfo.setAliveSecond(aliveSecond);
        }
        if (bytesSpeed != null) {
            mediaInfo.setBytesSpeed(bytesSpeed);
        }
        JSONArray jsonArray = jsonObject.getJSONArray("tracks");
        if (jsonArray.isEmpty()) {
@@ -90,6 +114,10 @@
        List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
        MediaInfo mediaInfo = new MediaInfo();
        mediaInfo.setReaderCount(param.getTotalReaderCount());
        mediaInfo.setOnline(param.isRegist());
        mediaInfo.setOriginType(param.getOriginType());
        mediaInfo.setAliveSecond(param.getAliveSecond());
        mediaInfo.setBytesSpeed(param.getBytesSpeed());
        for (OnStreamChangedHookParam.MediaTrack mediaTrack : tracks) {
            switch (mediaTrack.getCodec_id()) {
                case 0:
@@ -187,4 +215,36 @@
    public void setDuration(Long duration) {
        this.duration = duration;
    }
    public Boolean getOnline() {
        return online;
    }
    public void setOnline(Boolean online) {
        this.online = online;
    }
    public Integer getOriginType() {
        return originType;
    }
    public void setOriginType(Integer originType) {
        this.originType = originType;
    }
    public Long getAliveSecond() {
        return aliveSecond;
    }
    public void setAliveSecond(Long aliveSecond) {
        this.aliveSecond = aliveSecond;
    }
    public Long getBytesSpeed() {
        return bytesSpeed;
    }
    public void setBytesSpeed(Long bytesSpeed) {
        this.bytesSpeed = bytesSpeed;
    }
}
src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java
@@ -4,39 +4,53 @@
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import java.util.List;
import java.util.Map;
public interface IMediaNodeServerService {
    int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode);
    int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode);
    void closeRtpServer(MediaServer mediaServerItem, String streamId);
    void closeRtpServer(MediaServer mediaServer, String streamId);
    void closeRtpServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
    void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
    void closeStreams(MediaServer mediaServerItem, String app, String stream);
    void closeStreams(MediaServer mediaServer, String app, String stream);
    Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String stream, String ssrc);
    Boolean updateRtpServerSSRC(MediaServer mediaServer, String stream, String ssrc);
    boolean checkNodeId(MediaServer mediaServerItem);
    boolean checkNodeId(MediaServer mediaServer);
    void online(MediaServer mediaServerItem);
    void online(MediaServer mediaServer);
    MediaServer checkMediaServer(String ip, int port, String secret);
    boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc);
    boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName);
    boolean deleteRecordDirectory(MediaServer mediaServer, String app, String stream, String date, String fileName);
    List<StreamInfo> getMediaList(MediaServer mediaServerItem, String app, String stream, String callId);
    List<StreamInfo> getMediaList(MediaServer mediaServer, String app, String stream, String callId);
    Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream);
    Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream);
    void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName);
    void getSnap(MediaServer mediaServer, String streamUrl, int timeoutSec, int expireSec, String path, String fileName);
    MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream);
    MediaInfo getMediaInfo(MediaServer mediaServer, String app, String stream);
    Boolean pauseRtpCheck(MediaServer mediaServerItem, String streamKey);
    Boolean pauseRtpCheck(MediaServer mediaServer, String streamKey);
    Boolean resumeRtpCheck(MediaServer mediaServerItem, String streamKey);
    Boolean resumeRtpCheck(MediaServer mediaServer, String streamKey);
    String getFfmpegCmd(MediaServer mediaServer, String cmdKey);
    WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey);
    WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType);
    Boolean delFFmpegSource(MediaServer mediaServer, String streamKey);
    Boolean delStreamProxy(MediaServer mediaServer, String streamKey);
    Map<String, String> getFFmpegCMDs(MediaServer mediaServer);
}
src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
@@ -6,8 +6,10 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import java.util.List;
import java.util.Map;
/**
 * 媒体服务节点
@@ -87,4 +89,19 @@
    Boolean pauseRtpCheck(MediaServer mediaServerItem, String streamKey);
    boolean resumeRtpCheck(MediaServer mediaServerItem, String streamKey);
    String getFfmpegCmd(MediaServer mediaServer, String cmdKey);
    void closeStreams(MediaServer mediaServerItem, String app, String stream);
    WVPResult<String> addFFmpegSource(MediaServer mediaServerItem, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey);
    WVPResult<String> addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType);
    Boolean delFFmpegSource(MediaServer mediaServerItem, String streamKey);
    Boolean delStreamProxy(MediaServer mediaServerItem, String streamKey);
    Map<String, String> getFFmpegCMDs(MediaServer mediaServer);
}
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -22,6 +22,7 @@
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@@ -74,31 +75,31 @@
     * 初始化
     */
    @Override
    public void updateVmServer(List<MediaServer> mediaServerItemList) {
    public void updateVmServer(List<MediaServer> mediaServerList) {
        logger.info("[媒体服务节点] 缓存初始化 ");
        for (MediaServer mediaServerItem : mediaServerItemList) {
            if (ObjectUtils.isEmpty(mediaServerItem.getId())) {
        for (MediaServer mediaServer : mediaServerList) {
            if (ObjectUtils.isEmpty(mediaServer.getId())) {
                continue;
            }
            // 更新
            if (!ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
                ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
            if (!ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) {
                ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null);
            }
            // 查询redis是否存在此mediaServer
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
            String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServer.getId();
            Boolean hasKey = redisTemplate.hasKey(key);
            if (hasKey != null && ! hasKey) {
                redisTemplate.opsForValue().set(key, mediaServerItem);
                redisTemplate.opsForValue().set(key, mediaServer);
            }
        }
    }
    @Override
    public SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
    public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck,
                                  boolean isPlayback, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        if (mediaServerItem == null || mediaServerItem.getId() == null) {
            logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null");
        if (mediaServer == null || mediaServer.getId() == null) {
            logger.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null");
            return null;
        }
        // 获取mediaServer可用的ssrc
@@ -107,9 +108,9 @@
            ssrc = presetSsrc;
        }else {
            if (isPlayback) {
                ssrc = ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
                ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
            }else {
                ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
                ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
            }
        }
@@ -121,97 +122,97 @@
            logger.warn("[openRTPServer] 平台对接时下级可能自定义ssrc,但是tcp模式zlm收流目前无法更新ssrc,可能收流超时,此时请使用udp收流或者关闭ssrc校验");
        }
        int rtpServerPort;
        if (mediaServerItem.isRtpEnable()) {
            IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaServer.isRtpEnable()) {
            IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
            if (mediaNodeServerService == null) {
                logger.info("[openRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
                logger.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
                return null;
            }
            rtpServerPort = mediaNodeServerService.createRTPServer(mediaServerItem, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode);
            rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, reUsePort, tcpMode);
        } else {
            rtpServerPort = mediaServerItem.getRtpProxyPort();
            rtpServerPort = mediaServer.getRtpProxyPort();
        }
        return new SSRCInfo(rtpServerPort, ssrc, streamId);
    }
    @Override
    public SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) {
        return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, port, onlyAuto, null, 0);
    public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) {
        return openRTPServer(mediaServer, streamId, ssrc, ssrcCheck, isPlayback, port, onlyAuto, null, 0);
    }
    @Override
    public void closeRTPServer(MediaServer mediaServerItem, String streamId) {
        if (mediaServerItem == null) {
    public void closeRTPServer(MediaServer mediaServer, String streamId) {
        if (mediaServer == null) {
            return;
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaNodeServerService.closeRtpServer(mediaServerItem, streamId);
        mediaNodeServerService.closeRtpServer(mediaServer, streamId);
    }
    @Override
    public void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback) {
        if (mediaServerItem == null) {
    public void closeRTPServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
        if (mediaServer == null) {
            callback.run(false);
            return;
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaNodeServerService.closeRtpServer(mediaServerItem, streamId, callback);
        mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback);
    }
    @Override
    public void closeRTPServer(String mediaServerId, String streamId) {
        MediaServer mediaServerItem = this.getOne(mediaServerId);
        if (mediaServerItem == null) {
        MediaServer mediaServer = this.getOne(mediaServerId);
        if (mediaServer == null) {
            return;
        }
        if (mediaServerItem.isRtpEnable()) {
            closeRTPServer(mediaServerItem, streamId);
        if (mediaServer.isRtpEnable()) {
            closeRTPServer(mediaServer, streamId);
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaNodeServerService.closeStreams(mediaServerItem, "rtp", streamId);
        mediaNodeServerService.closeStreams(mediaServer, "rtp", streamId);
    }
    @Override
    public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
        if (mediaServerItem == null) {
    public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
        if (mediaServer == null) {
            return false;
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[updateRtpServerSSRC] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[updateRtpServerSSRC] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
        return mediaNodeServerService.updateRtpServerSSRC(mediaServer, streamId, ssrc);
    }
    @Override
    public void releaseSsrc(String mediaServerItemId, String ssrc) {
        MediaServer mediaServerItem = getOne(mediaServerItemId);
        if (mediaServerItem == null || ssrc == null) {
    public void releaseSsrc(String mediaServerId, String ssrc) {
        MediaServer mediaServer = getOne(mediaServerId);
        if (mediaServer == null || ssrc == null) {
            return;
        }
        ssrcFactory.releaseSsrc(mediaServerItemId, ssrc);
        ssrcFactory.releaseSsrc(mediaServerId, ssrc);
    }
    /**
     * 媒体服务节点 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
     */
    @Override
    public void clearRTPServer(MediaServer mediaServerItem) {
        ssrcFactory.reset(mediaServerItem.getId());
    public void clearRTPServer(MediaServer mediaServer) {
        ssrcFactory.reset(mediaServer.getId());
    }
@@ -219,22 +220,22 @@
    public void update(MediaServer mediaSerItem) {
        mediaServerMapper.update(mediaSerItem);
        MediaServer mediaServerInRedis = getOne(mediaSerItem.getId());
        MediaServer mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
        if (mediaServerItemInDataBase == null) {
        MediaServer mediaServerInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
        if (mediaServerInDataBase == null) {
            return;
        }
        mediaServerItemInDataBase.setStatus(mediaSerItem.isStatus());
        if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerItemInDataBase.getId())) {
            ssrcFactory.initMediaServerSSRC(mediaServerItemInDataBase.getId(),null);
        mediaServerInDataBase.setStatus(mediaSerItem.isStatus());
        if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerInDataBase.getId())) {
            ssrcFactory.initMediaServerSSRC(mediaServerInDataBase.getId(),null);
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
        redisTemplate.opsForValue().set(key, mediaServerItemInDataBase);
        if (mediaServerItemInDataBase.isStatus()) {
            resetOnlineServerItem(mediaServerItemInDataBase);
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerInDataBase.getId();
        redisTemplate.opsForValue().set(key, mediaServerInDataBase);
        if (mediaServerInDataBase.isStatus()) {
            resetOnlineServerItem(mediaServerInDataBase);
        }else {
            // 发送事件
            MediaServerChangeEvent event = new MediaServerChangeEvent(this);
            event.setMediaServerItemList(mediaServerItemInDataBase);
            event.setMediaServerItemList(mediaServerInDataBase);
            applicationEventPublisher.publishEvent(event);
        }
    }
@@ -247,16 +248,16 @@
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        for (Object mediaServerKey : mediaServerKeys) {
            String key = (String) mediaServerKey;
            MediaServer mediaServerItem = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
            if (Objects.isNull(mediaServerItem)) {
            MediaServer mediaServer = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
            if (Objects.isNull(mediaServer)) {
                continue;
            }
            // 检查状态
            Double aDouble = redisTemplate.opsForZSet().score(onlineKey, mediaServerItem.getId());
            Double aDouble = redisTemplate.opsForZSet().score(onlineKey, mediaServer.getId());
            if (aDouble != null) {
                mediaServerItem.setStatus(true);
                mediaServer.setStatus(true);
            }
            result.add(mediaServerItem);
            result.add(mediaServer);
        }
        result.sort((serverItem1, serverItem2)->{
            int sortResult = 0;
@@ -275,10 +276,10 @@
        if (mediaServerList.isEmpty()) {
            return new ArrayList<>();
        }
        for (MediaServer mediaServerItem : mediaServerList) {
            MediaServer mediaServerItemInRedis = getOne(mediaServerItem.getId());
            if (mediaServerItemInRedis != null) {
                mediaServerItem.setStatus(mediaServerItemInRedis.isStatus());
        for (MediaServer mediaServer : mediaServerList) {
            MediaServer mediaServerInRedis = getOne(mediaServer.getId());
            if (mediaServerInRedis != null) {
                mediaServer.setStatus(mediaServerInRedis.isStatus());
            }
        }
        return mediaServerList;
@@ -310,7 +311,7 @@
    /**
     * 获取单个媒体服务节点服务器
     * @param mediaServerId 服务id
     * @return MediaServerItem
     * @return mediaServer
     */
    @Override
    public MediaServer getOne(String mediaServerId) {
@@ -334,32 +335,32 @@
    }
    @Override
    public void add(MediaServer mediaServerItem) {
        mediaServerItem.setCreateTime(DateUtil.getNow());
        mediaServerItem.setUpdateTime(DateUtil.getNow());
        if (mediaServerItem.getHookAliveInterval() == null || mediaServerItem.getHookAliveInterval() == 0F) {
            mediaServerItem.setHookAliveInterval(10F);
    public void add(MediaServer mediaServer) {
        mediaServer.setCreateTime(DateUtil.getNow());
        mediaServer.setUpdateTime(DateUtil.getNow());
        if (mediaServer.getHookAliveInterval() == null || mediaServer.getHookAliveInterval() == 0F) {
            mediaServer.setHookAliveInterval(10F);
        }
        if (mediaServerItem.getType() == null) {
            logger.info("[添加媒体节点] 失败, mediaServerItem的类型:为空");
        if (mediaServer.getType() == null) {
            logger.info("[添加媒体节点] 失败, mediaServer的类型:为空");
            return;
        }
        if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) {
            logger.info("[添加媒体节点] 失败, 媒体服务ID已存在,请修改媒体服务器配置, {}", mediaServerItem.getId());
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置");
        if (mediaServerMapper.queryOne(mediaServer.getId()) != null) {
            logger.info("[添加媒体节点] 失败, 媒体服务ID已存在,请修改媒体服务器配置, {}", mediaServer.getId());
            throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败,媒体服务ID [ " + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置");
        }
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[添加媒体节点] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[添加媒体节点] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaServerMapper.add(mediaServerItem);
        if (mediaServerItem.isStatus()) {
            mediaNodeServerService.online(mediaServerItem);
        mediaServerMapper.add(mediaServer);
        if (mediaServer.isStatus()) {
            mediaNodeServerService.online(mediaServer);
        }else {
            // 发送事件
            MediaServerChangeEvent event = new MediaServerChangeEvent(this);
            event.setMediaServerItemList(mediaServerItem);
            event.setMediaServerItemList(mediaServer);
            applicationEventPublisher.publishEvent(event);
        }
    }
@@ -403,7 +404,7 @@
    /**
     * 获取负载最低的节点
     * @return MediaServerItem
     * @return mediaServer
     */
    @Override
    public MediaServer getMediaServerForMinimumLoad(Boolean hasAssist) {
@@ -417,16 +418,16 @@
        // 获取分数最低的,及并发最低的
        Set<Object> objects = redisTemplate.opsForZSet().range(key, 0, -1);
        ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
        MediaServer mediaServerItem = null;
        MediaServer mediaServer = null;
        if (hasAssist == null) {
            String mediaServerId = (String)mediaServerObjectS.get(0);
            mediaServerItem = getOne(mediaServerId);
            mediaServer = getOne(mediaServerId);
        }else if (hasAssist) {
            for (Object mediaServerObject : mediaServerObjectS) {
                String mediaServerId = (String)mediaServerObject;
                MediaServer serverItem = getOne(mediaServerId);
                if (serverItem.getRecordAssistPort() > 0) {
                    mediaServerItem = serverItem;
                    mediaServer = serverItem;
                    break;
                }
            }
@@ -435,13 +436,13 @@
                String mediaServerId = (String)mediaServerObject;
                MediaServer serverItem = getOne(mediaServerId);
                if (serverItem.getRecordAssistPort() == 0) {
                    mediaServerItem = serverItem;
                    mediaServer = serverItem;
                    break;
                }
            }
        }
        return mediaServerItem;
        return mediaServer;
    }
    @Override
@@ -452,16 +453,16 @@
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(type);
        if (mediaNodeServerService == null) {
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", type);
            logger.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", type);
            return null;
        }
        MediaServer mediaServerItem = mediaNodeServerService.checkMediaServer(ip, port, secret);
        if (mediaServerItem != null) {
            if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置");
        MediaServer mediaServer = mediaNodeServerService.checkMediaServer(ip, port, secret);
        if (mediaServer != null) {
            if (mediaServerMapper.queryOne(mediaServer.getId()) != null) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServer.getId() + " ] 已存在,请修改媒体服务器配置");
            }
        }
        return mediaServerItem;
        return mediaServer;
    }
    @Override
@@ -504,28 +505,28 @@
    public void syncCatchFromDatabase() {
        List<MediaServer> allInCatch = getAllOnlineList();
        List<MediaServer> allInDatabase = mediaServerMapper.queryAll();
        Map<String, MediaServer> mediaServerItemMap = new HashMap<>();
        Map<String, MediaServer> mediaServerMap = new HashMap<>();
        for (MediaServer mediaServerItem : allInDatabase) {
            mediaServerItemMap.put(mediaServerItem.getId(), mediaServerItem);
        for (MediaServer mediaServer : allInDatabase) {
            mediaServerMap.put(mediaServer.getId(), mediaServer);
        }
        for (MediaServer mediaServerItem : allInCatch) {
        for (MediaServer mediaServer : allInCatch) {
            // 清除数据中不存在但redis缓存数据
            if (!mediaServerItemMap.containsKey(mediaServerItem.getId())) {
                delete(mediaServerItem.getId());
            if (!mediaServerMap.containsKey(mediaServer.getId())) {
                delete(mediaServer.getId());
            }
        }
    }
    @Override
    public MediaServerLoad getLoad(MediaServer mediaServerItem) {
    public MediaServerLoad getLoad(MediaServer mediaServer) {
        MediaServerLoad result = new MediaServerLoad();
        result.setId(mediaServerItem.getId());
        result.setPush(redisCatchStorage.getPushStreamCount(mediaServerItem.getId()));
        result.setProxy(redisCatchStorage.getProxyStreamCount(mediaServerItem.getId()));
        result.setId(mediaServer.getId());
        result.setPush(redisCatchStorage.getPushStreamCount(mediaServer.getId()));
        result.setProxy(redisCatchStorage.getProxyStreamCount(mediaServer.getId()));
        result.setGbReceive(inviteStreamService.getStreamInfoCount(mediaServerItem.getId()));
        result.setGbSend(redisCatchStorage.getGbSendCount(mediaServerItem.getId()));
        result.setGbReceive(inviteStreamService.getStreamInfoCount(mediaServer.getId()));
        result.setGbSend(redisCatchStorage.getGbSendCount(mediaServer.getId()));
        return result;
    }
@@ -539,79 +540,149 @@
    public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType());
        if (mediaNodeServerService == null) {
            logger.info("[stopSendRtp] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaInfo.getType());
            logger.info("[stopSendRtp] 失败, mediaServer的类型: {},未找到对应的实现类", mediaInfo.getType());
            return false;
        }
        return mediaNodeServerService.stopSendRtp(mediaInfo, app, stream, ssrc);
    }
    @Override
    public boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public boolean deleteRecordDirectory(MediaServer mediaServer, String app, String stream, String date, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[stopSendRtp] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[stopSendRtp] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.deleteRecordDirectory(mediaServerItem, app, stream, date, fileName);
        return mediaNodeServerService.deleteRecordDirectory(mediaServer, app, stream, date, fileName);
    }
    @Override
    public List<StreamInfo> getMediaList(MediaServer mediaServerItem, String app, String stream, String callId) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public List<StreamInfo> getMediaList(MediaServer mediaServer, String app, String stream, String callId) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getMediaList] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[getMediaList] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return new ArrayList<>();
        }
        return mediaNodeServerService.getMediaList(mediaServerItem, app, stream, callId);
        return mediaNodeServerService.getMediaList(mediaServer, app, stream, callId);
    }
    @Override
    public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[connectRtpServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[connectRtpServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.connectRtpServer(mediaServerItem, address, port, stream);
        return mediaNodeServerService.connectRtpServer(mediaServer, address, port, stream);
    }
    @Override
    public void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public void getSnap(MediaServer mediaServer, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getSnap] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[getSnap] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaNodeServerService.getSnap(mediaServerItem, streamUrl, timeoutSec, expireSec, path, fileName);
        mediaNodeServerService.getSnap(mediaServer, streamUrl, timeoutSec, expireSec, path, fileName);
    }
    @Override
    public MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public MediaInfo getMediaInfo(MediaServer mediaServer, String app, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getMediaInfo] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[getMediaInfo] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return null;
        }
        return mediaNodeServerService.getMediaInfo(mediaServerItem, app, stream);
        return mediaNodeServerService.getMediaInfo(mediaServer, app, stream);
    }
    @Override
    public Boolean pauseRtpCheck(MediaServer mediaServerItem, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public Boolean pauseRtpCheck(MediaServer mediaServer, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[pauseRtpCheck] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[pauseRtpCheck] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.pauseRtpCheck(mediaServerItem, streamKey);
        return mediaNodeServerService.pauseRtpCheck(mediaServer, streamKey);
    }
    @Override
    public boolean resumeRtpCheck(MediaServer mediaServerItem, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
    public boolean resumeRtpCheck(MediaServer mediaServer, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[pauseRtpCheck] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            logger.info("[pauseRtpCheck] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.resumeRtpCheck(mediaServerItem, streamKey);
        return mediaNodeServerService.resumeRtpCheck(mediaServer, streamKey);
    }
    @Override
    public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getFfmpegCmd] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return null;
        }
        return mediaNodeServerService.getFfmpegCmd(mediaServer, cmdKey);
    }
    @Override
    public void closeStreams(MediaServer mediaServer, String app, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[closeStreams] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return;
        }
        mediaNodeServerService.closeStreams(mediaServer, app, stream);
    }
    @Override
    public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[addFFmpegSource] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return WVPResult.fail(ErrorCode.ERROR400);
        }
        return mediaNodeServerService.addFFmpegSource(mediaServer, srcUrl, dstUrl, timeoutMs, enableAudio, enableMp4, ffmpegCmdKey);
    }
    @Override
    public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[addStreamProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return WVPResult.fail(ErrorCode.ERROR400);
        }
        return mediaNodeServerService.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType);
    }
    @Override
    public Boolean delFFmpegSource(MediaServer mediaServer, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[delFFmpegSource] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return false;
        }
        return mediaNodeServerService.delFFmpegSource(mediaServer, streamKey);
    }
    @Override
    public Boolean delStreamProxy(MediaServer mediaServerItem, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[delStreamProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return false;
        }
        return mediaNodeServerService.delStreamProxy(mediaServerItem, streamKey);
    }
    @Override
    public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getFFmpegCMDs] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
            return new HashMap<>();
        }
        return mediaNodeServerService.getFFmpegCMDs(mediaServer);
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
@@ -11,6 +11,7 @@
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,41 +39,41 @@
    private String sipIp;
    @Override
    public int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        return zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode);
    public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        return zlmServerFactory.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode);
    }
    @Override
    public void closeRtpServer(MediaServer mediaServerItem, String streamId) {
        zlmresTfulUtils.closeStreams(mediaServerItem, "rtp", streamId);
    public void closeRtpServer(MediaServer mediaServer, String streamId) {
        zlmresTfulUtils.closeStreams(mediaServer, "rtp", streamId);
    }
    @Override
    public void closeRtpServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback) {
        zlmServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
    public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
        zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
    }
    @Override
    public void closeStreams(MediaServer mediaServerItem, String app, String stream) {
        zlmresTfulUtils.closeStreams(mediaServerItem, app, stream);
    public void closeStreams(MediaServer mediaServer, String app, String stream) {
        zlmresTfulUtils.closeStreams(mediaServer, app, stream);
    }
    @Override
    public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
        return zlmServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
    public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
        return zlmServerFactory.updateRtpServerSSRC(mediaServer, streamId, ssrc);
    }
    @Override
    public boolean checkNodeId(MediaServer mediaServerItem) {
        if (mediaServerItem == null) {
    public boolean checkNodeId(MediaServer mediaServer) {
        if (mediaServer == null) {
            return false;
        }
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServer);
        if (responseJSON != null) {
            JSONArray data = responseJSON.getJSONArray("data");
            if (data != null && !data.isEmpty()) {
                ZLMServerConfig zlmServerConfig= JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
                return zlmServerConfig.getGeneralMediaServerId().equals(mediaServerItem.getId());
                return zlmServerConfig.getGeneralMediaServerId().equals(mediaServer.getId());
            }else {
                return false;
            }
@@ -83,17 +84,17 @@
    }
    @Override
    public void online(MediaServer mediaServerItem) {
    public void online(MediaServer mediaServer) {
    }
    @Override
    public MediaServer checkMediaServer(String ip, int port, String secret) {
        MediaServer mediaServerItem = new MediaServer();
        mediaServerItem.setIp(ip);
        mediaServerItem.setHttpPort(port);
        mediaServerItem.setSecret(secret);
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
        MediaServer mediaServer = new MediaServer();
        mediaServer.setIp(ip);
        mediaServer.setHttpPort(port);
        mediaServer.setSecret(secret);
        JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServer);
        if (responseJSON == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接失败");
        }
@@ -105,18 +106,18 @@
        if (zlmServerConfig == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败");
        }
        mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId());
        mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpPort());
        mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort());
        mediaServerItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
        mediaServerItem.setRtspPort(zlmServerConfig.getRtspPort());
        mediaServerItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
        mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
        mediaServerItem.setStreamIp(ip);
        mediaServerItem.setHookIp(sipIp.split(",")[0]);
        mediaServerItem.setSdpIp(ip);
        mediaServerItem.setType("zlm");
        return mediaServerItem;
        mediaServer.setId(zlmServerConfig.getGeneralMediaServerId());
        mediaServer.setHttpSSlPort(zlmServerConfig.getHttpPort());
        mediaServer.setRtmpPort(zlmServerConfig.getRtmpPort());
        mediaServer.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
        mediaServer.setRtspPort(zlmServerConfig.getRtspPort());
        mediaServer.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
        mediaServer.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
        mediaServer.setStreamIp(ip);
        mediaServer.setHookIp(sipIp.split(",")[0]);
        mediaServer.setSdpIp(ip);
        mediaServer.setType("zlm");
        return mediaServer;
    }
    @Override
@@ -134,22 +135,22 @@
    }
    @Override
    public boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName) {
        logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件, server: {} {}:{}->{}/{}", mediaServerItem.getId(), app, stream, date, fileName);
        JSONObject jsonObject = zlmresTfulUtils.deleteRecordDirectory(mediaServerItem, app,
    public boolean deleteRecordDirectory(MediaServer mediaServer, String app, String stream, String date, String fileName) {
        logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件, server: {} {}:{}->{}/{}", mediaServer.getId(), app, stream, date, fileName);
        JSONObject jsonObject = zlmresTfulUtils.deleteRecordDirectory(mediaServer, app,
                stream, date, fileName);
        if (jsonObject.getInteger("code") == 0) {
            return true;
        }else {
            logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件错误, server: {} {}:{}->{}/{}, 结果: {}", mediaServerItem.getId(), app, stream, date, fileName, jsonObject);
            logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件错误, server: {} {}:{}->{}/{}, 结果: {}", mediaServer.getId(), app, stream, date, fileName, jsonObject);
            return false;
        }
    }
    @Override
    public List<StreamInfo> getMediaList(MediaServer mediaServerItem, String app, String stream, String callId) {
    public List<StreamInfo> getMediaList(MediaServer mediaServer, String app, String stream, String callId) {
        List<StreamInfo> streamInfoList = new ArrayList<>();
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream);
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaServer, app, stream);
        if (mediaList != null) {
            if (mediaList.getInteger("code") == 0) {
                JSONArray data = mediaList.getJSONArray("data");
@@ -158,7 +159,7 @@
                }
                JSONObject mediaJSON = data.getJSONObject(0);
                MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON);
                StreamInfo streamInfo = getStreamInfoByAppAndStream(mediaServerItem, app, stream, mediaInfo, callId, true);
                StreamInfo streamInfo = getStreamInfoByAppAndStream(mediaServer, app, stream, mediaInfo, callId, true);
                if (streamInfo != null) {
                    streamInfoList.add(streamInfo);
                }
@@ -167,41 +168,42 @@
        return streamInfoList;
    }
    public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId, boolean isPlay) {
    public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String callId, boolean isPlay) {
        StreamInfo streamInfoResult = new StreamInfo();
        streamInfoResult.setStream(stream);
        streamInfoResult.setApp(app);
        String addr = mediaServerItem.getStreamIp();
        String addr = mediaServer.getStreamIp();
        streamInfoResult.setIp(addr);
        streamInfoResult.setMediaServerId(mediaServerItem.getId());
        streamInfoResult.setMediaServerId(mediaServer.getId());
        String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId;
        streamInfoResult.setRtmp(addr, mediaServerItem.getRtmpPort(),mediaServerItem.getRtmpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtsp(addr, mediaServerItem.getRtspPort(),mediaServerItem.getRtspSSLPort(), app,  stream, callIdParam);
        streamInfoResult.setFlv(addr, mediaServerItem.getHttpPort(),mediaServerItem.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setFmp4(addr, mediaServerItem.getHttpPort(),mediaServerItem.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setHls(addr, mediaServerItem.getHttpPort(),mediaServerItem.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setTs(addr, mediaServerItem.getHttpPort(),mediaServerItem.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtc(addr, mediaServerItem.getHttpPort(),mediaServerItem.getHttpSSlPort(), app,  stream, callIdParam, isPlay);
        streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app,  stream, callIdParam);
        streamInfoResult.setFlv(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setFmp4(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setHls(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setTs(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam);
        streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app,  stream, callIdParam, isPlay);
        streamInfoResult.setMediaInfo(mediaInfo);
        streamInfoResult.setOriginType(mediaInfo.getOriginType());
        return streamInfoResult;
    }
    @Override
    public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
        JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, address, port, stream);
    public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
        JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, stream);
        logger.info("[TCP主动连接对方] 结果: {}", jsonObject);
        return jsonObject.getInteger("code") == 0;
    }
    @Override
    public void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
        zlmresTfulUtils.getSnap(mediaServerItem, streamUrl, timeoutSec, expireSec, path, fileName);
    public void getSnap(MediaServer mediaServer, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
        zlmresTfulUtils.getSnap(mediaServer, streamUrl, timeoutSec, expireSec, path, fileName);
    }
    @Override
    public MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream) {
        JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", stream);
    public MediaInfo getMediaInfo(MediaServer mediaServer, String app, String stream) {
        JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaServer, app, "rtsp", stream);
        if (jsonObject.getInteger("code") != 0) {
            return null;
        }
@@ -209,14 +211,90 @@
    }
    @Override
    public Boolean pauseRtpCheck(MediaServer mediaServerItem, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamKey);
    public Boolean pauseRtpCheck(MediaServer mediaServer, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServer, streamKey);
        return jsonObject.getInteger("code") == 0;
    }
    @Override
    public Boolean resumeRtpCheck(MediaServer mediaServerItem, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamKey);
    public Boolean resumeRtpCheck(MediaServer mediaServer, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServer, streamKey);
        return jsonObject.getInteger("code") == 0;
    }
    @Override
    public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) {
        JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaServer);
        if (jsonObject.getInteger("code") != 0) {
            logger.warn("[getFfmpegCmd] 获取流媒体配置失败");
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败");
        }
        JSONArray dataArray = jsonObject.getJSONArray("data");
        JSONObject mediaServerConfig = dataArray.getJSONObject(0);
        if (ObjectUtils.isEmpty(cmdKey)) {
            cmdKey = "ffmpeg.cmd";
        }
       return mediaServerConfig.getString(cmdKey);
    }
    @Override
    public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
        JSONObject jsonObject = zlmresTfulUtils.addFFmpegSource(mediaServer, srcUrl, dstUrl, timeoutMs, enableAudio, enableMp4, ffmpegCmdKey);
        if (jsonObject.getInteger("code") != 0) {
            logger.warn("[getFfmpegCmd] 添加FFMPEG代理失败");
            return WVPResult.fail(ErrorCode.ERROR100.getCode(), "添加FFMPEG代理失败");
        }else {
            JSONObject data = jsonObject.getJSONObject("data");
            if (data == null) {
                return WVPResult.fail(ErrorCode.ERROR100.getCode(), "代理结果异常: " + jsonObject);
            }else {
                return WVPResult.success(data.getString("key"));
            }
        }
    }
    @Override
    public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) {
        JSONObject jsonObject = zlmresTfulUtils.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType);
        if (jsonObject.getInteger("code") != 0) {
            logger.warn("[addStreamProxy] 添加代理失败");
            return WVPResult.fail(ErrorCode.ERROR100.getCode(), "添加代理失败");
        }else {
            JSONObject data = jsonObject.getJSONObject("data");
            if (data == null) {
                return WVPResult.fail(ErrorCode.ERROR100.getCode(), "代理结果异常: " + jsonObject);
            }else {
                return WVPResult.success("");
            }
        }
    }
    @Override
    public Boolean delFFmpegSource(MediaServer mediaServer, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.delFFmpegSource(mediaServer, streamKey);
        return jsonObject.getInteger("code") == 0;
    }
    @Override
    public Boolean delStreamProxy(MediaServer mediaServer, String streamKey) {
        JSONObject jsonObject = zlmresTfulUtils.delStreamProxy(mediaServer, streamKey);
        return jsonObject.getInteger("code") == 0;
    }
    @Override
    public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
        Map<String, String> result = new HashMap<>();
        JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServer);
        if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
                && mediaServerConfigResuly.getJSONArray("data").size() > 0){
            JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
            for (String key : mediaServerConfig.keySet()) {
                if (key.startsWith("ffmpeg.cmd")){
                    result.put(key, mediaServerConfig.getString(key));
                }
            }
        }
        return result;
    }
}
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -269,7 +269,7 @@
        return sendPost(mediaServerItem, "getRtpInfo",param, null);
    }
    public JSONObject addFFmpegSource(MediaServer mediaServerItem, String src_url, String dst_url, String timeout_ms,
    public JSONObject addFFmpegSource(MediaServer mediaServerItem, String src_url, String dst_url, Integer timeout_ms,
                                      boolean enable_audio, boolean enable_mp4, String ffmpeg_cmd_key){
        logger.info(src_url);
        logger.info(dst_url);
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -150,6 +151,27 @@
                - DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(streamPushItem.getCreateTime())).intValue();
    }
    public StreamPushItem instance(StreamInfo streamInfo) {
        StreamPushItem streamPushItem = new StreamPushItem();
        streamPushItem.setApp(streamInfo.getApp());
        streamPushItem.setMediaServerId(streamInfo.getMediaServerId());
        streamPushItem.setStream(streamInfo.getStream());
        streamPushItem.setAliveSecond(streamInfo.getMediaInfo().getAliveSecond());
//        streamPushItem.setOriginSock(streamInfo.getMediaInfo().getOriginSock());
        streamPushItem.setTotalReaderCount(streamInfo.getMediaInfo().getReaderCount() + "");
        streamPushItem.setOriginType(streamInfo.getOriginType());
//        streamPushItem.setOriginTypeStr(streamInfo.getMediaInfo().getOriginTypeStr());
//        streamPushItem.setOriginUrl(streamInfo.getMediaInfo().getOriginUrl());
        streamPushItem.setCreateTime(DateUtil.getNow());
        streamPushItem.setAliveSecond(streamInfo.getMediaInfo().getAliveSecond());
        streamPushItem.setStatus(true);
        streamPushItem.setStreamType("push");
//        streamPushItem.setVhost(streamInfo.getVhost());
        streamPushItem.setServerId(streamInfo.getMediaServerId());
        return streamPushItem;
    }
    public static class MediaSchema {
        private String schema;
        private Long bytesSpeed;
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
@@ -1,12 +1,14 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import java.util.Map;
public interface IStreamProxyService {
@@ -18,17 +20,19 @@
    /**
     * 添加视频代理到zlm
     *
     * @param param
     * @return
     */
    JSONObject addStreamProxyToZlm(StreamProxyItem param);
    WVPResult<String> addStreamProxyToZlm(StreamProxyItem param);
    /**
     * 从zlm移除视频代理
     *
     * @param param
     * @return
     */
    JSONObject removeStreamProxyFromZlm(StreamProxyItem param);
    Boolean removeStreamProxyFromZlm(StreamProxyItem param);
    /**
     * 分页查询
@@ -73,9 +77,10 @@
    /**
     * 获取ffmpeg.cmd模板
     *
     * @return
     */
    JSONObject getFFmpegCMDs(MediaServer mediaServerItem);
    Map<String, String> getFFmpegCMDs(MediaServer mediaServerItem);
    /**
     * 根据app与stream获取streamProxy
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -1,9 +1,8 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
@@ -15,8 +14,6 @@
 * @author lin
 */
public interface IStreamPushService {
    List<StreamPushItem> handleJSON(String json, MediaServer mediaServerItem);
    /**
     * 将应用名和流ID加入国标关联
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.GeneralCallback;
@@ -9,7 +8,8 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
@@ -18,7 +18,6 @@
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -29,6 +28,7 @@
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,9 +62,6 @@
    @Autowired
    private IMediaService mediaService;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private ZLMServerFactory zlmServerFactory;
@@ -108,28 +105,21 @@
    @Override
    public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
        MediaServer mediaInfo;
        MediaServer mediaServer;
        if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
            mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
            mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
        }else {
            mediaInfo = mediaServerService.getOne(param.getMediaServerId());
            mediaServer = mediaServerService.getOne(param.getMediaServerId());
        }
        if (mediaInfo == null) {
        if (mediaServer == null) {
            logger.warn("保存代理未找到在线的ZLM...");
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
        }
        String dstUrl;
        if ("ffmpeg".equalsIgnoreCase(param.getType())) {
            JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaInfo);
            if (jsonObject.getInteger("code") != 0) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败");
            }
            JSONArray dataArray = jsonObject.getJSONArray("data");
            JSONObject mediaServerConfig = dataArray.getJSONObject(0);
            if (ObjectUtils.isEmpty(param.getFfmpegCmdKey())) {
                param.setFfmpegCmdKey("ffmpeg.cmd");
            }
            String ffmpegCmd = mediaServerConfig.getString(param.getFfmpegCmdKey());
            String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, param.getFfmpegCmdKey());
            if (ffmpegCmd == null) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd");
            }
@@ -140,25 +130,25 @@
            int port;
            String schemaForUri;
            if (schema.equalsIgnoreCase("rtsp")) {
                port = mediaInfo.getRtspPort();
                port = mediaServer.getRtspPort();
                schemaForUri = schema;
            }else if (schema.equalsIgnoreCase("flv")) {
                port = mediaInfo.getRtmpPort();
                port = mediaServer.getRtmpPort();
                schemaForUri = schema;
            }else {
                port = mediaInfo.getRtmpPort();
                port = mediaServer.getRtmpPort();
                schemaForUri = schema;
            }
            dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
                    param.getStream());
        }else {
            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
            dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), param.getApp(),
                    param.getStream());
        }
        param.setDstUrl(dstUrl);
        logger.info("[拉流代理] 输出地址为:{}", dstUrl);
        param.setMediaServerId(mediaInfo.getId());
        param.setMediaServerId(mediaServer.getId());
        boolean saveResult;
        // 更新
        if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
@@ -170,17 +160,17 @@
            callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
            return;
        }
        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
        HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaServer.getId());
        hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                    mediaInfo, param.getApp(), param.getStream(), null, null);
                    mediaServer, param.getApp(), param.getStream(), null, null);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
        });
        if (param.isEnable()) {
            String talkKey = UUID.randomUUID().toString();
            String delayTalkKey = UUID.randomUUID().toString();
            dynamicTask.startDelay(delayTalkKey, ()->{
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false);
                if (streamInfo != null) {
                    callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
                }else {
@@ -188,12 +178,12 @@
                    callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
                }
            }, 7000);
            JSONObject jsonObject = addStreamProxyToZlm(param);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
            WVPResult<String> result = addStreamProxyToZlm(param);
            if (result != null && result.getCode() == 0) {
                hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
                dynamicTask.stop(talkKey);
                StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                        mediaInfo, param.getApp(), param.getStream(), null, null);
                        mediaServer, param.getApp(), param.getStream(), null, null);
                callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
            }else {
                param.setEnable(false);
@@ -203,16 +193,16 @@
                }else {
                    updateStreamProxy(param);
                }
                if (jsonObject == null){
                if (result == null){
                    callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
                }else {
                    callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
                    callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null);
                }
            }
        }
        else{
            StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
                    mediaInfo, param.getApp(), param.getStream(), null, null);
                    mediaServer, param.getApp(), param.getStream(), null, null);
            callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
        }
    }
@@ -308,40 +298,36 @@
    }
    @Override
    public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
        JSONObject result = null;
        MediaServer mediaServerItem = null;
    public WVPResult<String> addStreamProxyToZlm(StreamProxyItem param) {
        WVPResult<String> result = null;
        MediaServer mediaServer = null;
        if (param.getMediaServerId() == null) {
            logger.warn("添加代理时MediaServerId 为null");
            return null;
        }else {
            mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
            mediaServer = mediaServerService.getOne(param.getMediaServerId());
        }
        if (mediaServerItem == null) {
        if (mediaServer == null) {
            return null;
        }
        if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
            zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
        if (zlmServerFactory.isStreamReady(mediaServer, param.getApp(), param.getStream())) {
            mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream());
        }
        String msgResult;
        if ("ffmpeg".equalsIgnoreCase(param.getType())){
            result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
                    param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
            result = mediaServerService.addFFmpegSource(mediaServer, param.getSrcUrl().trim(), param.getDstUrl(),
                    param.getTimeoutMs(), param.isEnableAudio(), param.isEnableMp4(),
                    param.getFfmpegCmdKey());
        }else {
            result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
            result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getUrl().trim(),
                    param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
        }
        System.out.println("addStreamProxyToZlm====");
        System.out.println(result);
        if (result != null && result.getInteger("code") == 0) {
            JSONObject data = result.getJSONObject("data");
            if (data == null) {
                logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
                return result;
            }
            String key = data.getString("key");
        if (result != null && result.getCode() == 0) {
            String key = result.getData();
            if (key == null) {
                logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
                logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
                return result;
            }
            param.setStreamKey(key);
@@ -351,16 +337,16 @@
    }
    @Override
    public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
    public Boolean removeStreamProxyFromZlm(StreamProxyItem param) {
        if (param ==null) {
            return null;
        }
        MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
        JSONObject result = null;
        MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
        Boolean result = false;
        if ("ffmpeg".equalsIgnoreCase(param.getType())){
            result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
            result = mediaServerService.delFFmpegSource(mediaServer, param.getStreamKey());
        }else {
            result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
            result = mediaServerService.delStreamProxy(mediaServer, param.getStreamKey());
        }
        return result;
    }
@@ -381,8 +367,8 @@
            gbStreamMapper.del(app, stream);
            videoManagerStorager.deleteStreamProxy(app, stream);
            redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
            Boolean result = removeStreamProxyFromZlm(streamProxyItem);
            if (result != null && result) {
                logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);
            }else {
                logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream);
@@ -395,16 +381,16 @@
        boolean result = false;
        StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
        if (streamProxy != null && !streamProxy.isEnable() ) {
            JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
            if (jsonObject == null) {
            WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxy);
            if (wvpResult == null) {
                return false;
            }
            if (jsonObject.getInteger("code") == 0) {
            if (wvpResult.getCode() == 0) {
                result = true;
                streamProxy.setEnable(true);
                updateStreamProxy(streamProxy);
            }else {
                logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
                logger.info("启用代理失败: {}/{}->{}({})", app, stream, wvpResult.getMsg(),
                        streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl());
            }
        } else if (streamProxy != null && streamProxy.isEnable()) {
@@ -418,8 +404,8 @@
        boolean result = false;
        StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
        if (streamProxyDto != null && streamProxyDto.isEnable()) {
            JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
            Boolean removed = removeStreamProxyFromZlm(streamProxyDto);
            if (removed != null && removed) {
                streamProxyDto.setEnable(false);
                result = updateStreamProxy(streamProxyDto);
            }
@@ -428,20 +414,8 @@
    }
    @Override
    public JSONObject getFFmpegCMDs(MediaServer mediaServerItem) {
        JSONObject result = new JSONObject();
        JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
        if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
                && mediaServerConfigResuly.getJSONArray("data").size() > 0){
            JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
            for (String key : mediaServerConfig.keySet()) {
                if (key.startsWith("ffmpeg.cmd")){
                    result.put(key, mediaServerConfig.getString(key));
                }
            }
        }
        return result;
    public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
        return mediaServerService.getFFmpegCMDs(mediaServer);
    }
@@ -467,8 +441,8 @@
                mediaServerId, true);
        for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
            logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
            JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
            if (jsonObject == null) {
            WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxyDto);
            if (wvpResult == null) {
                // 设置为离线
                logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
                updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream());
@@ -521,41 +495,27 @@
        MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
        if (mediaServer != null) {
            List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
            if (allPullStream.size() > 0) {
                zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
                    Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
                    if (jsonObject.getInteger("code") == 0) {
                        JSONArray data = jsonObject.getJSONArray("data");
                        if(data != null && data.size() > 0) {
                            for (int i = 0; i < data.size(); i++) {
                                JSONObject streamJSONObj = data.getJSONObject(i);
                                if ("rtsp".equals(streamJSONObj.getString("schema"))) {
                                    StreamInfo streamInfo = new StreamInfo();
                                    String app = streamJSONObj.getString("app");
                                    String stream = streamJSONObj.getString("stream");
                                    streamInfo.setApp(app);
                                    streamInfo.setStream(stream);
                                    stringStreamInfoMap.put(app+stream, streamInfo);
                                }
                            }
            if (!allPullStream.isEmpty()) {
                List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
                Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
                if (mediaList != null && !mediaList.isEmpty()) {
                    for (StreamInfo streamInfo : mediaList) {
                        stringStreamInfoMap.put(streamInfo.getApp() + streamInfo.getStream(), streamInfo);
                    }
                }
                if (stringStreamInfoMap.isEmpty()) {
                    redisCatchStorage.removeStream(mediaServerId, "PULL");
                }else {
                    for (String key : stringStreamInfoMap.keySet()) {
                        StreamInfo streamInfo = stringStreamInfoMap.get(key);
                        if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
                            redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
                                    streamInfo.getStream());
                        }
                    }
                    if (stringStreamInfoMap.size() == 0) {
                        redisCatchStorage.removeStream(mediaServerId, "PULL");
                    }else {
                        for (String key : stringStreamInfoMap.keySet()) {
                            StreamInfo streamInfo = stringStreamInfoMap.get(key);
                            if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
                                redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
                                        streamInfo.getStream());
                            }
                        }
                    }
                });
                }
            }
        }
    }
    @Override
@@ -589,13 +549,12 @@
            MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
            // TODO 支持其他 schema
            JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
            MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream());
            if (mediaInfo == null){
                streamProxyItem.setStatus(false);
            } else {
                if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
                if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) {
                    streamProxyItem.setStatus(true);
                } else {
                    streamProxyItem.setStatus(false);
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -1,21 +1,22 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -67,9 +68,6 @@
    private EventPublisher eventPublisher;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
@@ -88,32 +86,27 @@
    private MediaConfig mediaConfig;
    @Override
    public List<StreamPushItem> handleJSON(String jsonData, MediaServer mediaServerItem) {
        if (jsonData == null) {
    private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
        if (streamInfoList == null || streamInfoList.isEmpty()) {
            return null;
        }
        Map<String, StreamPushItem> result = new HashMap<>();
        List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {});
        for (OnStreamChangedHookParam item : onStreamChangedHookParams) {
        for (StreamInfo streamInfo : streamInfoList) {
            // 不保存国标推理以及拉流代理的流
            if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                    || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                    || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                String key = item.getApp() + "_" + item.getStream();
            if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
                    || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                    || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
                String key = streamInfo.getApp() + "_" + streamInfo.getStream();
                StreamPushItem streamPushItem = result.get(key);
                if (streamPushItem == null) {
                    streamPushItem = transform(item);
                    streamPushItem = streamPushItem.instance(streamInfo);
                    result.put(key, streamPushItem);
                }
            }
        }
        return new ArrayList<>(result.values());
    }
    @Override
    public StreamPushItem transform(OnStreamChangedHookParam item) {
        StreamPushItem streamPushItem = new StreamPushItem();
@@ -165,14 +158,9 @@
        platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
        int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
        MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
        JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
        if (mediaList != null) {
            if (mediaList.getInteger("code") == 0) {
                JSONArray data = mediaList.getJSONArray("data");
                if (data == null) {
                    streamPushMapper.del(stream.getApp(), stream.getStream());
                }
            }
        List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
        if (mediaList != null && mediaList.isEmpty()) {
            streamPushMapper.del(stream.getApp(), stream.getStream());
        }
        return del > 0;
    }
@@ -196,7 +184,7 @@
        int delStream = streamPushMapper.del(app, streamId);
        if (delStream > 0) {
            MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
            zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
            mediaServerService.closeStreams(mediaServerItem,app, streamId);
        }
        return true;
    }
@@ -232,71 +220,61 @@
        for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
            streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
        }
        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
            if (mediaList == null) {
                return;
        List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
        if (mediaList == null) {
            return;
        }
        List<StreamPushItem> streamPushItems = handleJSON(mediaList);
        if (streamPushItems != null) {
            for (StreamPushItem streamPushItem : streamPushItems) {
                pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
            }
            String dataStr = mediaList.getString("data");
            Integer code = mediaList.getInteger("code");
            List<StreamPushItem> streamPushItems = null;
            if (code == 0 ) {
                if (dataStr != null) {
                    streamPushItems = handleJSON(dataStr, mediaServerItem);
                }
            }
            if (streamPushItems != null) {
                for (StreamPushItem streamPushItem : streamPushItems) {
                    pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                    streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
                }
            }
            List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
            if (offlinePushItems.size() > 0) {
                String type = "PUSH";
                int runLimit = 300;
                if (offlinePushItems.size() > runLimit) {
                    for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
                        int toIndex = i + runLimit;
                        if (i + runLimit > offlinePushItems.size()) {
                            toIndex = offlinePushItems.size();
                        }
                        List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
                        streamPushMapper.delAll(streamPushItemsSub);
        }
        List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
        if (offlinePushItems.size() > 0) {
            String type = "PUSH";
            int runLimit = 300;
            if (offlinePushItems.size() > runLimit) {
                for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
                    int toIndex = i + runLimit;
                    if (i + runLimit > offlinePushItems.size()) {
                        toIndex = offlinePushItems.size();
                    }
                }else {
                    streamPushMapper.delAll(offlinePushItems);
                    List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
                    streamPushMapper.delAll(streamPushItemsSub);
                }
            }
            Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
            if (offlineOnStreamChangedHookParamList.size() > 0) {
                String type = "PUSH";
                for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("serverId", userSetting.getServerId());
                    jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
                    jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
                    jsonObject.put("register", false);
                    jsonObject.put("mediaServerId", mediaServerId);
                    redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                    // 移除redis内流的信息
                    redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
                    // 冗余数据,自己系统中自用
                    redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
                }
            }else {
                streamPushMapper.delAll(offlinePushItems);
            }
            Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
            if (streamAuthorityInfos.size() > 0) {
                for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
                    // 移除redis内流的信息
                    redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
                }
        }
        Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
        if (offlineOnStreamChangedHookParamList.size() > 0) {
            String type = "PUSH";
            for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("serverId", userSetting.getServerId());
                jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
                jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
                jsonObject.put("register", false);
                jsonObject.put("mediaServerId", mediaServerId);
                redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
                // 移除redis内流的信息
                redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
                // 冗余数据,自己系统中自用
                redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
            }
        }));
        }
        Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
        if (streamAuthorityInfos.size() > 0) {
            for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
                // 移除redis内流的信息
                redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
            }
        }
    }
    @Override
@@ -471,7 +449,7 @@
        if (delStream > 0) {
            for (GbStream gbStream : gbStreams) {
                MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
                zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
                mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
            }
        }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -16,17 +16,14 @@
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.*;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -71,22 +68,13 @@
    private IVideoManagerStorage storager;
    @Autowired
    private IRedisCatchStorage redisCatchStorage;
    @Autowired
    private IInviteStreamService inviteStreamService;
    @Autowired
    private ZLMRESTfulUtils zlmresTfulUtils;
    @Autowired
    private DeferredResultHolder resultHolder;
    @Autowired
    private IPlayService playService;
    @Autowired
    private IMediaService mediaService;
    @Autowired
    private IMediaServerService mediaServerService;
@@ -202,50 +190,6 @@
        json.put("isSubStream", isSubStream);
        return json;
    }
    /**
     * 将不是h264的视频通过ffmpeg 转码为h264 + aac
     * @param streamId 流ID
     */
    @Operation(summary = "将不是h264的视频通过ffmpeg 转码为h264 + aac", security = @SecurityRequirement(name = JwtUtils.HEADER))
    @Parameter(name = "streamId", description = "视频流ID", required = true)
    @PostMapping("/convert/{streamId}")
    public JSONObject playConvert(@PathVariable String streamId) {
//        StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
        InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, streamId);
        if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
            logger.warn("视频转码API调用失败!, 视频流已经停止!");
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到视频流信息, 视频流可能已经停止");
        }
        MediaServer mediaInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
        JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
        if (!rtpInfo.getBoolean("exist")) {
            logger.warn("视频转码API调用失败!, 视频流已停止推流!");
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到视频流信息, 视频流可能已停止推流");
        } else {
            String dstUrl = String.format("rtmp://%s:%s/convert/%s", "127.0.0.1", mediaInfo.getRtmpPort(),
                    streamId );
            String srcUrl = String.format("rtsp://%s:%s/rtp/%s", "127.0.0.1", mediaInfo.getRtspPort(), streamId);
            JSONObject jsonObject = zlmresTfulUtils.addFFmpegSource(mediaInfo, srcUrl, dstUrl, "1000000", true, false, null);
            logger.info(jsonObject.toJSONString());
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                JSONObject data = jsonObject.getJSONObject("data");
                if (data != null) {
                    JSONObject result = new JSONObject();
                    result.put("key", data.getString("key"));
                    StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStreamWithCheck("convert", streamId, mediaInfo.getId(), false);
                    result.put("StreamInfo", streamInfoResult);
                    return result;
                }else {
                    throw new ControllerException(ErrorCode.ERROR100.getCode(), "转码失败");
                }
            }else {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "转码失败");
            }
        }
    }
    /**
     * 结束转码
     */
@@ -261,14 +205,8 @@
        if (mediaInfo == null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "使用的流媒体已经停止运行" );
        }else {
            JSONObject jsonObject = zlmresTfulUtils.delFFmpegSource(mediaInfo, key);
            logger.info(jsonObject.toJSONString());
            if (jsonObject != null && jsonObject.getInteger("code") == 0) {
                JSONObject data = jsonObject.getJSONObject("data");
                if (data == null || data.getBoolean("flag") == null || !data.getBoolean("flag")) {
                    throw new ControllerException(ErrorCode.ERROR100 );
                }
            }else {
            Boolean deleted = mediaServerService.delFFmpegSource(mediaInfo, key);
            if (!deleted) {
                throw new ControllerException(ErrorCode.ERROR100 );
            }
        }
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
@@ -27,6 +27,7 @@
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings("rawtypes")
@@ -135,7 +136,7 @@
    @ResponseBody
    @Operation(summary = "获取ffmpeg.cmd模板", security = @SecurityRequirement(name = JwtUtils.HEADER))
    @Parameter(name = "mediaServerId", description = "流媒体ID", required = true)
    public JSONObject getFFmpegCMDs(@RequestParam String mediaServerId){
    public Map<String, String> getFFmpegCMDs(@RequestParam String mediaServerId){
        logger.debug("获取节点[ {} ]ffmpeg.cmd模板", mediaServerId );
        MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);