648540858
2024-03-14 d4f6ec39b7e0421757a6b9d1a68b1c4610ea2e8c
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
old mode 100644 new mode 100755
@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
@@ -20,6 +21,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
@Service
@DS("master")
public class InviteStreamServiceImpl implements IInviteStreamService {
    private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
@@ -77,23 +79,50 @@
        }
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + inviteInfoForUpdate.getType() +
                "_" + inviteInfoForUpdate.getDeviceId() +
                "_" + inviteInfoForUpdate.getChannelId() +
                "_" + inviteInfoForUpdate.getStream();
                ":" + inviteInfoForUpdate.getType() +
                ":" + inviteInfoForUpdate.getDeviceId() +
                ":" + inviteInfoForUpdate.getChannelId() +
                ":" + inviteInfoForUpdate.getStream()+
                ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
        redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
    }
    @Override
    public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
        if (inviteInfoInDb == null) {
            return null;
        }
        removeInviteInfo(inviteInfoInDb);
        String key = VideoManagerConstants.INVITE_PREFIX +
                ":" + inviteInfo.getType() +
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + stream +
                ":" + inviteInfo.getSsrcInfo().getSsrc();
        inviteInfoInDb.setStream(stream);
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setStream(stream);
        }
        redisTemplate.opsForValue().set(key, inviteInfoInDb);
        return inviteInfoInDb;
    }
    @Override
    public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
                ":" + (type != null ? type : "*") +
                ":" + (deviceId != null ? deviceId : "*") +
                ":" + (channelId != null ? channelId : "*") +
                ":" + (stream != null ? stream : "*")
                + ":*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() != 1) {
        if (scanResult.isEmpty()) {
            return null;
        }
        if (scanResult.size() != 1) {
            logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
        }
        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
@@ -112,10 +141,11 @@
    @Override
    public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
        String scanKey = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
                ":" + (type != null ? type : "*") +
                ":" + (deviceId != null ? deviceId : "*") +
                ":" + (channelId != null ? channelId : "*") +
                ":" + (stream != null ? stream : "*") +
                ":*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
        if (scanResult.size() > 0) {
            for (Object keyObj : scanResult) {
@@ -152,24 +182,11 @@
    }
    @Override
    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
        String key = buildKey(type, deviceId, channelId, stream);
        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
        if (callbacks == null) {
            return;
        }
        for (ErrorCallback<Object> callback : callbacks) {
            callback.run(code, msg, data);
        }
        inviteErrorCallbackMap.remove(key);
    }
    private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = type + "_" +  deviceId + "_" + channelId;
        String key = type + ":" +  deviceId + ":" + channelId;
        // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
        if (stream != null) {
            key += ("_" + stream);
            key += (":" + stream);
        }
        return key;
    }
@@ -183,7 +200,7 @@
    @Override
    public int getStreamInfoCount(String mediaServerId) {
        int count = 0;
        String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*";
        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() == 0) {
            return 0;
@@ -198,4 +215,58 @@
        }
        return count;
    }
    @Override
    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
        String key = buildSubStreamKey(type, deviceId, channelId, stream);
        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
        if (callbacks == null) {
            return;
        }
        for (ErrorCallback<Object> callback : callbacks) {
            callback.run(code, msg, data);
        }
        inviteErrorCallbackMap.remove(key);
    }
    private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = type + ":" + ":" +  deviceId + ":" + channelId;
        // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
        if (stream != null) {
            key += (":" + stream);
        }
        return key;
    }
    @Override
    public InviteInfo getInviteInfoBySSRC(String ssrc) {
        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() != 1) {
            return null;
        }
        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
    }
    @Override
    public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
        if (inviteInfoInDb == null) {
            return null;
        }
        removeInviteInfo(inviteInfoInDb);
        String key = VideoManagerConstants.INVITE_PREFIX +
                ":" + inviteInfo.getType() +
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + inviteInfo.getStream() +
                ":" + ssrc;
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
        }
        redisTemplate.opsForValue().set(key, inviteInfoInDb);
        return inviteInfoInDb;
    }
}