648540858
2023-08-16 985082d33930868c3cc1723f28fd9aaae9013a8f
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
old mode 100644 new mode 100755
@@ -77,20 +77,44 @@
        }
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + inviteInfoForUpdate.getType() +
                "_" + inviteInfoForUpdate.getDeviceId() +
                "_" + inviteInfoForUpdate.getChannelId() +
                "_" + inviteInfoForUpdate.getStream();
                ":" + inviteInfoForUpdate.getType() +
                ":" + inviteInfoForUpdate.getDeviceId() +
                ":" + inviteInfoForUpdate.getChannelId() +
                ":" + inviteInfoForUpdate.getStream()+
                ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
        redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
    }
    @Override
    public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
        if (inviteInfoInDb == null) {
            return null;
        }
        removeInviteInfo(inviteInfoInDb);
        String key = VideoManagerConstants.INVITE_PREFIX +
                ":" + inviteInfo.getType() +
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + stream +
                ":" + inviteInfo.getSsrcInfo().getSsrc();
        inviteInfoInDb.setStream(stream);
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setStream(stream);
        }
        redisTemplate.opsForValue().set(key, inviteInfoInDb);
        return inviteInfoInDb;
    }
    @Override
    public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
                ":" + (type != null ? type : "*") +
                ":" + (deviceId != null ? deviceId : "*") +
                ":" + (channelId != null ? channelId : "*") +
                ":" + (stream != null ? stream : "*")
                + ":*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() != 1) {
            return null;
@@ -112,10 +136,11 @@
    @Override
    public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
        String scanKey = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
                ":" + (type != null ? type : "*") +
                ":" + (deviceId != null ? deviceId : "*") +
                ":" + (channelId != null ? channelId : "*") +
                ":" + (stream != null ? stream : "*") +
                ":*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
        if (scanResult.size() > 0) {
            for (Object keyObj : scanResult) {
@@ -152,24 +177,11 @@
    }
    @Override
    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
        String key = buildKey(type, deviceId, channelId, stream);
        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
        if (callbacks == null) {
            return;
        }
        for (ErrorCallback<Object> callback : callbacks) {
            callback.run(code, msg, data);
        }
        inviteErrorCallbackMap.remove(key);
    }
    private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = type + "_" +  deviceId + "_" + channelId;
        String key = type + ":" +  deviceId + ":" + channelId;
        // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
        if (stream != null) {
            key += ("_" + stream);
            key += (":" + stream);
        }
        return key;
    }
@@ -183,7 +195,7 @@
    @Override
    public int getStreamInfoCount(String mediaServerId) {
        int count = 0;
        String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*";
        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() == 0) {
            return 0;
@@ -199,69 +211,9 @@
        return count;
    }
    /*======================设备主子码流逻辑START=========================*/
    @Override
    public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId, boolean isSubStream) {
        return getInviteInfo(type, deviceId, channelId,isSubStream, null);
    }
    @Override
    public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId, boolean isSubStream) {
        removeInviteInfo(inviteSessionType, deviceId, channelId,isSubStream, null);
    }
    @Override
    public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream) {
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (isSubStream ? "sub" : "main") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        if (scanResult.size() != 1) {
            return null;
        }
        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
    }
    @Override
    public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
        String scanKey = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (isSubStream ? "sub" : "main") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
        List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
        if (scanResult.size() > 0) {
            for (Object keyObj : scanResult) {
                String key = (String) keyObj;
                InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
                if (inviteInfo == null) {
                    continue;
                }
                redisTemplate.delete(key);
                inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
            }
        }
    }
    @Override
    public void once(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, ErrorCallback<Object> callback) {
        String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
        if (callbacks == null) {
            callbacks = new CopyOnWriteArrayList<>();
            inviteErrorCallbackMap.put(key, callbacks);
        }
        callbacks.add(callback);
    }
    @Override
    public void call(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, int code, String msg, Object data) {
        String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
    public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
        String key = buildSubStreamKey(type, deviceId, channelId, stream);
        List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
        if (callbacks == null) {
            return;
@@ -273,89 +225,43 @@
    }
    private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
        String key = type + "_" + (isSubStream ? "sub":"main") + "_" +  deviceId + "_" + channelId;
    private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = type + ":" + ":" +  deviceId + ":" + channelId;
        // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
        if (stream != null) {
            key += ("_" + stream);
            key += (":" + stream);
        }
        return key;
    }
    @Override
    public void updateInviteInfoSub(InviteInfo inviteInfo) {
        if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
            logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
            return;
        }
        InviteInfo inviteInfoForUpdate = null;
        if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
            if (inviteInfo.getDeviceId() == null
                    || inviteInfo.getChannelId() == null
                    || inviteInfo.getType() == null
                    || inviteInfo.getStream() == null
            ) {
                return;
            }
            inviteInfoForUpdate = inviteInfo;
        } else {
            InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
                    inviteInfo.getChannelId(),inviteInfo.isSubStream(), inviteInfo.getStream());
            if (inviteInfoInRedis == null) {
                logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
                        inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
                return;
            }
            if (inviteInfo.getStreamInfo() != null) {
                inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
            }
            if (inviteInfo.getSsrcInfo() != null) {
                inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
            }
            if (inviteInfo.getStreamMode() != null) {
                inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
            }
            if (inviteInfo.getReceiveIp() != null) {
                inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
            }
            if (inviteInfo.getReceivePort() != null) {
                inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
            }
            if (inviteInfo.getStatus() != null) {
                inviteInfoInRedis.setStatus(inviteInfo.getStatus());
            }
            inviteInfoForUpdate = inviteInfoInRedis;
        }
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + inviteInfoForUpdate.getType() +
                "_" + (inviteInfoForUpdate.isSubStream() ? "sub":"main") +
                "_" + inviteInfoForUpdate.getDeviceId() +
                "_" + inviteInfoForUpdate.getChannelId() +
                "_" + inviteInfoForUpdate.getStream();
        redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
    }
    @Override
    public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream, boolean isSubStream) {
        return getInviteInfo(type, null, null,isSubStream, stream);
    }
    @Override
    public List<Object> getInviteInfos(InviteSessionType type, String deviceId, String channelId, String stream) {
        String key = VideoManagerConstants.INVITE_PREFIX +
                "_" + (type != null ? type : "*") +
                "_" + (deviceId != null ? deviceId : "*") +
                "_" + (channelId != null ? channelId : "*") +
                "_" + (stream != null ? stream : "*");
    public InviteInfo getInviteInfoBySSRC(String ssrc) {
        String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
        List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
        return scanResult;
        if (scanResult.size() != 1) {
            return null;
        }
        return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
    }
    /*======================设备主子码流逻辑END=========================*/
    @Override
    public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
        InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
        if (inviteInfoInDb == null) {
            return null;
        }
        removeInviteInfo(inviteInfoInDb);
        String key = VideoManagerConstants.INVITE_PREFIX +
                ":" + inviteInfo.getType() +
                ":" + inviteInfo.getDeviceId() +
                ":" + inviteInfo.getChannelId() +
                ":" + inviteInfo.getStream() +
                ":" + inviteInfo.getSsrcInfo().getSsrc();
        if (inviteInfoInDb.getSsrcInfo() != null) {
            inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
        }
        redisTemplate.opsForValue().set(key, inviteInfoInDb);
        return inviteInfoInDb;
    }
}