qirong
2023-12-22 ac2873bd37eeb496b0e9dd62d66e9fc4b38ef39b
ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java
@@ -1,232 +1,232 @@
package org.dromara.common.redis.utils;
import org.dromara.common.core.utils.SpringUtils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.redisson.api.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
 * 分布式队列工具
 * 轻量级队列 重量级数据量 请使用 MQ
 * 要求 redis 5.X 以上
 *
 * @author Lion Li
 * @version 3.6.0 新增
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {
    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
    /**
     * 获取客户端实例
     */
    public static RedissonClient getClient() {
        return CLIENT;
    }
    /**
     * 添加普通队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     */
    public static <T> boolean addQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.offer(data);
    }
    /**
     * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.poll();
    }
    /**
     * 通用删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removeQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.remove(data);
    }
    /**
     * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.delete();
    }
    /**
     * 添加延迟队列数据 默认毫秒
     *
     * @param queueName 队列名
     * @param data      数据
     * @param time      延迟时间
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
    }
    /**
     * 添加延迟队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     * @param time      延迟时间
     * @param timeUnit  单位
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        delayedQueue.offer(data, time, timeUnit);
    }
    /**
     * 获取一个延迟队列数据 没有数据返回 null
     *
     * @param queueName 队列名
     */
    public static <T> T getDelayedQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.poll();
    }
    /**
     * 删除延迟队列数据
     */
    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.remove(data);
    }
    /**
     * 销毁延迟队列 所有阻塞监听 报错
     */
    public static <T> void destroyDelayedQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        delayedQueue.destroy();
    }
    /**
     * 添加优先队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     */
    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.offer(data);
    }
    /**
     * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.poll();
    }
    /**
     * 优先队列删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.remove(data);
    }
    /**
     * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyPriorityQueue(String queueName) {
        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
        return queue.delete();
    }
    /**
     * 尝试设置 有界队列 容量 用于限制数量
     *
     * @param queueName 队列名
     * @param capacity  容量
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.trySetCapacity(capacity);
    }
    /**
     * 尝试设置 有界队列 容量 用于限制数量
     *
     * @param queueName 队列名
     * @param capacity  容量
     * @param destroy   已存在是否销毁
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        if (boundedBlockingQueue.isExists() && destroy) {
            destroyQueue(queueName);
        }
        return boundedBlockingQueue.trySetCapacity(capacity);
    }
    /**
     * 添加有界队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     * @return 添加成功 true 已达到界限 false
     */
    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.offer(data);
    }
    /**
     * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.poll();
    }
    /**
     * 有界队列删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.remove(data);
    }
    /**
     * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyBoundedQueue(String queueName) {
        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
        return queue.delete();
    }
    /**
     * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
     */
    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        queue.subscribeOnElements(consumer);
    }
}
//package org.dromara.common.redis.utils;
//
//import org.dromara.common.core.utils.SpringUtils;
//import lombok.AccessLevel;
//import lombok.NoArgsConstructor;
//import org.redisson.api.*;
//
//import java.util.concurrent.TimeUnit;
//import java.util.function.Consumer;
//
///**
// * 分布式队列工具
// * 轻量级队列 重量级数据量 请使用 MQ
// * 要求 redis 5.X 以上
// *
// * @author Lion Li
// * @version 3.6.0 新增
// */
//@NoArgsConstructor(access = AccessLevel.PRIVATE)
//public class QueueUtils {
//
//    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
//
//
//    /**
//     * 获取客户端实例
//     */
//    public static RedissonClient getClient() {
//        return CLIENT;
//    }
//
//    /**
//     * 添加普通队列数据
//     *
//     * @param queueName 队列名
//     * @param data      数据
//     */
//    public static <T> boolean addQueueObject(String queueName, T data) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        return queue.offer(data);
//    }
//
//    /**
//     * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
//     *
//     * @param queueName 队列名
//     */
//    public static <T> T getQueueObject(String queueName) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        return queue.poll();
//    }
//
//    /**
//     * 通用删除队列数据(不支持延迟队列)
//     */
//    public static <T> boolean removeQueueObject(String queueName, T data) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        return queue.remove(data);
//    }
//
//    /**
//     * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
//     */
//    public static <T> boolean destroyQueue(String queueName) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        return queue.delete();
//    }
//
//    /**
//     * 添加延迟队列数据 默认毫秒
//     *
//     * @param queueName 队列名
//     * @param data      数据
//     * @param time      延迟时间
//     */
//    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
//        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
//    }
//
//    /**
//     * 添加延迟队列数据
//     *
//     * @param queueName 队列名
//     * @param data      数据
//     * @param time      延迟时间
//     * @param timeUnit  单位
//     */
//    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
//        delayedQueue.offer(data, time, timeUnit);
//    }
//
//    /**
//     * 获取一个延迟队列数据 没有数据返回 null
//     *
//     * @param queueName 队列名
//     */
//    public static <T> T getDelayedQueueObject(String queueName) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
//        return delayedQueue.poll();
//    }
//
//    /**
//     * 删除延迟队列数据
//     */
//    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
//        return delayedQueue.remove(data);
//    }
//
//    /**
//     * 销毁延迟队列 所有阻塞监听 报错
//     */
//    public static <T> void destroyDelayedQueue(String queueName) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
//        delayedQueue.destroy();
//    }
//
//    /**
//     * 添加优先队列数据
//     *
//     * @param queueName 队列名
//     * @param data      数据
//     */
//    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
//        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
//        return priorityBlockingQueue.offer(data);
//    }
//
//    /**
//     * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
//     *
//     * @param queueName 队列名
//     */
//    public static <T> T getPriorityQueueObject(String queueName) {
//        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
//        return queue.poll();
//    }
//
//    /**
//     * 优先队列删除队列数据(不支持延迟队列)
//     */
//    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
//        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
//        return queue.remove(data);
//    }
//
//    /**
//     * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
//     */
//    public static <T> boolean destroyPriorityQueue(String queueName) {
//        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
//        return queue.delete();
//    }
//
//    /**
//     * 尝试设置 有界队列 容量 用于限制数量
//     *
//     * @param queueName 队列名
//     * @param capacity  容量
//     */
//    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
//        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
//        return boundedBlockingQueue.trySetCapacity(capacity);
//    }
//
//    /**
//     * 尝试设置 有界队列 容量 用于限制数量
//     *
//     * @param queueName 队列名
//     * @param capacity  容量
//     * @param destroy   已存在是否销毁
//     */
//    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
//        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
//        if (boundedBlockingQueue.isExists() && destroy) {
//            destroyQueue(queueName);
//        }
//        return boundedBlockingQueue.trySetCapacity(capacity);
//    }
//
//    /**
//     * 添加有界队列数据
//     *
//     * @param queueName 队列名
//     * @param data      数据
//     * @return 添加成功 true 已达到界限 false
//     */
//    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
//        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
//        return boundedBlockingQueue.offer(data);
//    }
//
//    /**
//     * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
//     *
//     * @param queueName 队列名
//     */
//    public static <T> T getBoundedQueueObject(String queueName) {
//        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
//        return queue.poll();
//    }
//
//    /**
//     * 有界队列删除队列数据(不支持延迟队列)
//     */
//    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
//        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
//        return queue.remove(data);
//    }
//
//    /**
//     * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
//     */
//    public static <T> boolean destroyBoundedQueue(String queueName) {
//        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
//        return queue.delete();
//    }
//
//    /**
//     * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
//     */
//    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
//        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
//        queue.subscribeOnElements(consumer);
//    }
//
//}