648540858
2024-03-21 b90dc789b429c31674c26bb3ff309b987afaa77a
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -7,11 +7,12 @@
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -73,9 +74,9 @@
     * 初始化
     */
    @Override
    public void updateVmServer(List<MediaServerItem> mediaServerItemList) {
    public void updateVmServer(List<MediaServer> mediaServerItemList) {
        logger.info("[媒体服务节点] 缓存初始化 ");
        for (MediaServerItem mediaServerItem : mediaServerItemList) {
        for (MediaServer mediaServerItem : mediaServerItemList) {
            if (ObjectUtils.isEmpty(mediaServerItem.getId())) {
                continue;
            }
@@ -94,7 +95,7 @@
    @Override
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
    public SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
                                  boolean isPlayback, Integer port, Boolean onlyAuto, Boolean reUsePort, Integer tcpMode) {
        if (mediaServerItem == null || mediaServerItem.getId() == null) {
            logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null");
@@ -134,13 +135,13 @@
    }
    @Override
    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) {
    public SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto) {
        return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, port, onlyAuto, null, 0);
    }
    @Override
    public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) {
    public void closeRTPServer(MediaServer mediaServerItem, String streamId) {
        if (mediaServerItem == null) {
            return;
        }
@@ -153,7 +154,7 @@
    }
    @Override
    public void closeRTPServer(MediaServerItem mediaServerItem, String streamId, CommonCallback<Boolean> callback) {
    public void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback) {
        if (mediaServerItem == null) {
            callback.run(false);
            return;
@@ -168,7 +169,7 @@
    @Override
    public void closeRTPServer(String mediaServerId, String streamId) {
        MediaServerItem mediaServerItem = this.getOne(mediaServerId);
        MediaServer mediaServerItem = this.getOne(mediaServerId);
        if (mediaServerItem == null) {
            return;
        }
@@ -184,7 +185,7 @@
    }
    @Override
    public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
    public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
        if (mediaServerItem == null) {
            return false;
        }
@@ -198,7 +199,7 @@
    @Override
    public void releaseSsrc(String mediaServerItemId, String ssrc) {
        MediaServerItem mediaServerItem = getOne(mediaServerItemId);
        MediaServer mediaServerItem = getOne(mediaServerItemId);
        if (mediaServerItem == null || ssrc == null) {
            return;
        }
@@ -209,21 +210,21 @@
     * 媒体服务节点 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
     */
    @Override
    public void clearRTPServer(MediaServerItem mediaServerItem) {
    public void clearRTPServer(MediaServer mediaServerItem) {
        ssrcFactory.reset(mediaServerItem.getId());
    }
    @Override
    public void update(MediaServerItem mediaSerItem) {
    public void update(MediaServer mediaSerItem) {
        mediaServerMapper.update(mediaSerItem);
        MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId());
        MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
        MediaServer mediaServerInRedis = getOne(mediaSerItem.getId());
        MediaServer mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
        if (mediaServerItemInDataBase == null) {
            return;
        }
        mediaServerItemInDataBase.setStatus(mediaSerItem.isStatus());
        if (mediaServerItemInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerItemInDataBase.getId())) {
        if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerItemInDataBase.getId())) {
            ssrcFactory.initMediaServerSSRC(mediaServerItemInDataBase.getId(),null);
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
@@ -240,13 +241,13 @@
    @Override
    public List<MediaServerItem> getAllOnlineList() {
        List<MediaServerItem> result = new ArrayList<>();
    public List<MediaServer> getAllOnlineList() {
        List<MediaServer> result = new ArrayList<>();
        List<Object> mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
        String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        for (Object mediaServerKey : mediaServerKeys) {
            String key = (String) mediaServerKey;
            MediaServerItem mediaServerItem = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServerItem.class);
            MediaServer mediaServerItem = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
            if (Objects.isNull(mediaServerItem)) {
                continue;
            }
@@ -269,13 +270,13 @@
    }
    @Override
    public List<MediaServerItem> getAll() {
        List<MediaServerItem> mediaServerList = mediaServerMapper.queryAll();
    public List<MediaServer> getAll() {
        List<MediaServer> mediaServerList = mediaServerMapper.queryAll();
        if (mediaServerList.isEmpty()) {
            return new ArrayList<>();
        }
        for (MediaServerItem mediaServerItem : mediaServerList) {
            MediaServerItem mediaServerItemInRedis = getOne(mediaServerItem.getId());
        for (MediaServer mediaServerItem : mediaServerList) {
            MediaServer mediaServerItemInRedis = getOne(mediaServerItem.getId());
            if (mediaServerItemInRedis != null) {
                mediaServerItem.setStatus(mediaServerItemInRedis.isStatus());
            }
@@ -285,21 +286,21 @@
    @Override
    public List<MediaServerItem> getAllFromDatabase() {
    public List<MediaServer> getAllFromDatabase() {
        return mediaServerMapper.queryAll();
    }
    @Override
    public List<MediaServerItem> getAllOnline() {
    public List<MediaServer> getAllOnline() {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        Set<Object> mediaServerIdSet = redisTemplate.opsForZSet().reverseRange(key, 0, -1);
        List<MediaServerItem> result = new ArrayList<>();
        List<MediaServer> result = new ArrayList<>();
        if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
            for (Object mediaServerId : mediaServerIdSet) {
                String mediaServerIdStr = (String) mediaServerId;
                String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerIdStr;
                result.add((MediaServerItem) redisTemplate.opsForValue().get(serverKey));
                result.add((MediaServer) redisTemplate.opsForValue().get(serverKey));
            }
        }
        Collections.reverse(result);
@@ -312,17 +313,17 @@
     * @return MediaServerItem
     */
    @Override
    public MediaServerItem getOne(String mediaServerId) {
    public MediaServer getOne(String mediaServerId) {
        if (mediaServerId == null) {
            return null;
        }
        String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
        return JsonUtil.redisJsonToObject(redisTemplate, key, MediaServerItem.class);
        return JsonUtil.redisJsonToObject(redisTemplate, key, MediaServer.class);
    }
    @Override
    public MediaServerItem getDefaultMediaServer() {
    public MediaServer getDefaultMediaServer() {
        return mediaServerMapper.queryDefault();
    }
@@ -333,7 +334,7 @@
    }
    @Override
    public void add(MediaServerItem mediaServerItem) {
    public void add(MediaServer mediaServerItem) {
        mediaServerItem.setCreateTime(DateUtil.getNow());
        mediaServerItem.setUpdateTime(DateUtil.getNow());
        if (mediaServerItem.getHookAliveInterval() == null || mediaServerItem.getHookAliveInterval() == 0F) {
@@ -364,7 +365,7 @@
    }
    @Override
    public void resetOnlineServerItem(MediaServerItem serverItem) {
    public void resetOnlineServerItem(MediaServer serverItem) {
        // 更新缓存
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        // 使用zset的分数作为当前并发量, 默认值设置为0
@@ -378,7 +379,7 @@
        }
    }
    private int getMediaList(MediaServerItem serverItem) {
    private int getMediaList(MediaServer serverItem) {
        return 0;
    }
@@ -405,7 +406,7 @@
     * @return MediaServerItem
     */
    @Override
    public MediaServerItem getMediaServerForMinimumLoad(Boolean hasAssist) {
    public MediaServer getMediaServerForMinimumLoad(Boolean hasAssist) {
        String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
        Long size = redisTemplate.opsForZSet().zCard(key);
        if (size  == null || size == 0) {
@@ -416,14 +417,14 @@
        // 获取分数最低的,及并发最低的
        Set<Object> objects = redisTemplate.opsForZSet().range(key, 0, -1);
        ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
        MediaServerItem mediaServerItem = null;
        MediaServer mediaServerItem = null;
        if (hasAssist == null) {
            String mediaServerId = (String)mediaServerObjectS.get(0);
            mediaServerItem = getOne(mediaServerId);
        }else if (hasAssist) {
            for (Object mediaServerObject : mediaServerObjectS) {
                String mediaServerId = (String)mediaServerObject;
                MediaServerItem serverItem = getOne(mediaServerId);
                MediaServer serverItem = getOne(mediaServerId);
                if (serverItem.getRecordAssistPort() > 0) {
                    mediaServerItem = serverItem;
                    break;
@@ -432,7 +433,7 @@
        }else if (!hasAssist) {
            for (Object mediaServerObject : mediaServerObjectS) {
                String mediaServerId = (String)mediaServerObject;
                MediaServerItem serverItem = getOne(mediaServerId);
                MediaServer serverItem = getOne(mediaServerId);
                if (serverItem.getRecordAssistPort() == 0) {
                    mediaServerItem = serverItem;
                    break;
@@ -444,7 +445,7 @@
    }
    @Override
    public MediaServerItem checkMediaServer(String ip, int port, String secret, String type) {
    public MediaServer checkMediaServer(String ip, int port, String secret, String type) {
        if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) {
            throw new ControllerException(ErrorCode.ERROR100.getCode(), "此连接已存在");
        }
@@ -454,7 +455,7 @@
            logger.info("[closeRTPServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", type);
            return null;
        }
        MediaServerItem mediaServerItem = mediaNodeServerService.checkMediaServer(ip, port, secret);
        MediaServer mediaServerItem = mediaNodeServerService.checkMediaServer(ip, port, secret);
        if (mediaServerItem != null) {
            if (mediaServerMapper.queryOne(mediaServerItem.getId()) != null) {
                throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + mediaServerItem.getId() + " ] 已存在,请修改媒体服务器配置");
@@ -495,20 +496,20 @@
    }
    @Override
    public MediaServerItem getOneFromDatabase(String mediaServerId) {
    public MediaServer getOneFromDatabase(String mediaServerId) {
        return mediaServerMapper.queryOne(mediaServerId);
    }
    @Override
    public void syncCatchFromDatabase() {
        List<MediaServerItem> allInCatch = getAllOnlineList();
        List<MediaServerItem> allInDatabase = mediaServerMapper.queryAll();
        Map<String, MediaServerItem> mediaServerItemMap = new HashMap<>();
        List<MediaServer> allInCatch = getAllOnlineList();
        List<MediaServer> allInDatabase = mediaServerMapper.queryAll();
        Map<String, MediaServer> mediaServerItemMap = new HashMap<>();
        for (MediaServerItem mediaServerItem : allInDatabase) {
        for (MediaServer mediaServerItem : allInDatabase) {
            mediaServerItemMap.put(mediaServerItem.getId(), mediaServerItem);
        }
        for (MediaServerItem mediaServerItem : allInCatch) {
        for (MediaServer mediaServerItem : allInCatch) {
            // 清除数据中不存在但redis缓存数据
            if (!mediaServerItemMap.containsKey(mediaServerItem.getId())) {
                delete(mediaServerItem.getId());
@@ -517,7 +518,7 @@
    }
    @Override
    public MediaServerLoad getLoad(MediaServerItem mediaServerItem) {
    public MediaServerLoad getLoad(MediaServer mediaServerItem) {
        MediaServerLoad result = new MediaServerLoad();
        result.setId(mediaServerItem.getId());
        result.setPush(redisCatchStorage.getPushStreamCount(mediaServerItem.getId()));
@@ -529,13 +530,13 @@
    }
    @Override
    public List<MediaServerItem> getAllWithAssistPort() {
    public List<MediaServer> getAllWithAssistPort() {
        return mediaServerMapper.queryAllWithAssistPort();
    }
    @Override
    public boolean stopSendRtp(MediaServerItem mediaInfo, String app, String stream, String ssrc) {
    public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaInfo.getType());
        if (mediaNodeServerService == null) {
            logger.info("[stopSendRtp] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaInfo.getType());
@@ -545,7 +546,7 @@
    }
    @Override
    public boolean deleteRecordDirectory(MediaServerItem mediaServerItem, String app, String stream, String date, String fileName) {
    public boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[stopSendRtp] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
@@ -555,7 +556,7 @@
    }
    @Override
    public List<StreamInfo> getMediaList(MediaServerItem mediaServerItem, String app, String stream, String callId) {
    public List<StreamInfo> getMediaList(MediaServer mediaServerItem, String app, String stream, String callId) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getMediaList] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
@@ -563,4 +564,54 @@
        }
        return mediaNodeServerService.getMediaList(mediaServerItem, app, stream, callId);
    }
    @Override
    public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[connectRtpServer] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return false;
        }
        return mediaNodeServerService.connectRtpServer(mediaServerItem, address, port, stream);
    }
    @Override
    public void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getSnap] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return;
        }
        mediaNodeServerService.getSnap(mediaServerItem, streamUrl, timeoutSec, expireSec, path, fileName);
    }
    @Override
    public MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[getMediaInfo] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return null;
        }
        return mediaNodeServerService.getMediaInfo(mediaServerItem, app, stream);
    }
    @Override
    public Boolean pauseRtpCheck(MediaServer mediaServerItem, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[pauseRtpCheck] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return false;
        }
        return mediaNodeServerService.pauseRtpCheck(mediaServerItem, streamKey);
    }
    @Override
    public boolean resumeRtpCheck(MediaServer mediaServerItem, String streamKey) {
        IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServerItem.getType());
        if (mediaNodeServerService == null) {
            logger.info("[pauseRtpCheck] 失败, mediaServerItem的类型: {},未找到对应的实现类", mediaServerItem.getType());
            return false;
        }
        return mediaNodeServerService.resumeRtpCheck(mediaServerItem, streamKey);
    }
}