| | |
| | | package org.dromara.common.redis.utils; |
| | | |
| | | import org.dromara.common.core.utils.SpringUtils; |
| | | import lombok.AccessLevel; |
| | | import lombok.NoArgsConstructor; |
| | | import org.redisson.api.*; |
| | | |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.function.Consumer; |
| | | |
| | | /** |
| | | * 分布式队列工具 |
| | | * 轻量级队列 重量级数据量 请使用 MQ |
| | | * 要求 redis 5.X 以上 |
| | | * |
| | | * @author Lion Li |
| | | * @version 3.6.0 新增 |
| | | */ |
| | | @NoArgsConstructor(access = AccessLevel.PRIVATE) |
| | | public class QueueUtils { |
| | | |
| | | private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class); |
| | | |
| | | |
| | | /** |
| | | * 获取客户端实例 |
| | | */ |
| | | public static RedissonClient getClient() { |
| | | return CLIENT; |
| | | } |
| | | |
| | | /** |
| | | * 添加普通队列数据 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param data 数据 |
| | | */ |
| | | public static <T> boolean addQueueObject(String queueName, T data) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | return queue.offer(data); |
| | | } |
| | | |
| | | /** |
| | | * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | * |
| | | * @param queueName 队列名 |
| | | */ |
| | | public static <T> T getQueueObject(String queueName) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | return queue.poll(); |
| | | } |
| | | |
| | | /** |
| | | * 通用删除队列数据(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean removeQueueObject(String queueName, T data) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | return queue.remove(data); |
| | | } |
| | | |
| | | /** |
| | | * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean destroyQueue(String queueName) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | return queue.delete(); |
| | | } |
| | | |
| | | /** |
| | | * 添加延迟队列数据 默认毫秒 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param data 数据 |
| | | * @param time 延迟时间 |
| | | */ |
| | | public static <T> void addDelayedQueueObject(String queueName, T data, long time) { |
| | | addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS); |
| | | } |
| | | |
| | | /** |
| | | * 添加延迟队列数据 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param data 数据 |
| | | * @param time 延迟时间 |
| | | * @param timeUnit 单位 |
| | | */ |
| | | public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | delayedQueue.offer(data, time, timeUnit); |
| | | } |
| | | |
| | | /** |
| | | * 获取一个延迟队列数据 没有数据返回 null |
| | | * |
| | | * @param queueName 队列名 |
| | | */ |
| | | public static <T> T getDelayedQueueObject(String queueName) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | return delayedQueue.poll(); |
| | | } |
| | | |
| | | /** |
| | | * 删除延迟队列数据 |
| | | */ |
| | | public static <T> boolean removeDelayedQueueObject(String queueName, T data) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | return delayedQueue.remove(data); |
| | | } |
| | | |
| | | /** |
| | | * 销毁延迟队列 所有阻塞监听 报错 |
| | | */ |
| | | public static <T> void destroyDelayedQueue(String queueName) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | delayedQueue.destroy(); |
| | | } |
| | | |
| | | /** |
| | | * 添加优先队列数据 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param data 数据 |
| | | */ |
| | | public static <T> boolean addPriorityQueueObject(String queueName, T data) { |
| | | RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | return priorityBlockingQueue.offer(data); |
| | | } |
| | | |
| | | /** |
| | | * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | * |
| | | * @param queueName 队列名 |
| | | */ |
| | | public static <T> T getPriorityQueueObject(String queueName) { |
| | | RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | return queue.poll(); |
| | | } |
| | | |
| | | /** |
| | | * 优先队列删除队列数据(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean removePriorityQueueObject(String queueName, T data) { |
| | | RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | return queue.remove(data); |
| | | } |
| | | |
| | | /** |
| | | * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean destroyPriorityQueue(String queueName) { |
| | | RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | return queue.delete(); |
| | | } |
| | | |
| | | /** |
| | | * 尝试设置 有界队列 容量 用于限制数量 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param capacity 容量 |
| | | */ |
| | | public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) { |
| | | RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | return boundedBlockingQueue.trySetCapacity(capacity); |
| | | } |
| | | |
| | | /** |
| | | * 尝试设置 有界队列 容量 用于限制数量 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param capacity 容量 |
| | | * @param destroy 已存在是否销毁 |
| | | */ |
| | | public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) { |
| | | RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | if (boundedBlockingQueue.isExists() && destroy) { |
| | | destroyQueue(queueName); |
| | | } |
| | | return boundedBlockingQueue.trySetCapacity(capacity); |
| | | } |
| | | |
| | | /** |
| | | * 添加有界队列数据 |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param data 数据 |
| | | * @return 添加成功 true 已达到界限 false |
| | | */ |
| | | public static <T> boolean addBoundedQueueObject(String queueName, T data) { |
| | | RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | return boundedBlockingQueue.offer(data); |
| | | } |
| | | |
| | | /** |
| | | * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | * |
| | | * @param queueName 队列名 |
| | | */ |
| | | public static <T> T getBoundedQueueObject(String queueName) { |
| | | RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | return queue.poll(); |
| | | } |
| | | |
| | | /** |
| | | * 有界队列删除队列数据(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean removeBoundedQueueObject(String queueName, T data) { |
| | | RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | return queue.remove(data); |
| | | } |
| | | |
| | | /** |
| | | * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | */ |
| | | public static <T> boolean destroyBoundedQueue(String queueName) { |
| | | RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | return queue.delete(); |
| | | } |
| | | |
| | | /** |
| | | * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等) |
| | | */ |
| | | public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | queue.subscribeOnElements(consumer); |
| | | } |
| | | |
| | | } |
| | | //package org.dromara.common.redis.utils; |
| | | // |
| | | //import org.dromara.common.core.utils.SpringUtils; |
| | | //import lombok.AccessLevel; |
| | | //import lombok.NoArgsConstructor; |
| | | //import org.redisson.api.*; |
| | | // |
| | | //import java.util.concurrent.TimeUnit; |
| | | //import java.util.function.Consumer; |
| | | // |
| | | ///** |
| | | // * 分布式队列工具 |
| | | // * 轻量级队列 重量级数据量 请使用 MQ |
| | | // * 要求 redis 5.X 以上 |
| | | // * |
| | | // * @author Lion Li |
| | | // * @version 3.6.0 新增 |
| | | // */ |
| | | //@NoArgsConstructor(access = AccessLevel.PRIVATE) |
| | | //public class QueueUtils { |
| | | // |
| | | // private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class); |
| | | // |
| | | // |
| | | // /** |
| | | // * 获取客户端实例 |
| | | // */ |
| | | // public static RedissonClient getClient() { |
| | | // return CLIENT; |
| | | // } |
| | | // |
| | | // /** |
| | | // * 添加普通队列数据 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param data 数据 |
| | | // */ |
| | | // public static <T> boolean addQueueObject(String queueName, T data) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // return queue.offer(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // */ |
| | | // public static <T> T getQueueObject(String queueName) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // return queue.poll(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 通用删除队列数据(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean removeQueueObject(String queueName, T data) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // return queue.remove(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean destroyQueue(String queueName) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // return queue.delete(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 添加延迟队列数据 默认毫秒 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param data 数据 |
| | | // * @param time 延迟时间 |
| | | // */ |
| | | // public static <T> void addDelayedQueueObject(String queueName, T data, long time) { |
| | | // addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 添加延迟队列数据 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param data 数据 |
| | | // * @param time 延迟时间 |
| | | // * @param timeUnit 单位 |
| | | // */ |
| | | // public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | // delayedQueue.offer(data, time, timeUnit); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 获取一个延迟队列数据 没有数据返回 null |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // */ |
| | | // public static <T> T getDelayedQueueObject(String queueName) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | // return delayedQueue.poll(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 删除延迟队列数据 |
| | | // */ |
| | | // public static <T> boolean removeDelayedQueueObject(String queueName, T data) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | // return delayedQueue.remove(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 销毁延迟队列 所有阻塞监听 报错 |
| | | // */ |
| | | // public static <T> void destroyDelayedQueue(String queueName) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); |
| | | // delayedQueue.destroy(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 添加优先队列数据 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param data 数据 |
| | | // */ |
| | | // public static <T> boolean addPriorityQueueObject(String queueName, T data) { |
| | | // RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | // return priorityBlockingQueue.offer(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // */ |
| | | // public static <T> T getPriorityQueueObject(String queueName) { |
| | | // RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | // return queue.poll(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 优先队列删除队列数据(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean removePriorityQueueObject(String queueName, T data) { |
| | | // RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | // return queue.remove(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean destroyPriorityQueue(String queueName) { |
| | | // RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName); |
| | | // return queue.delete(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 尝试设置 有界队列 容量 用于限制数量 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param capacity 容量 |
| | | // */ |
| | | // public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) { |
| | | // RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // return boundedBlockingQueue.trySetCapacity(capacity); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 尝试设置 有界队列 容量 用于限制数量 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param capacity 容量 |
| | | // * @param destroy 已存在是否销毁 |
| | | // */ |
| | | // public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) { |
| | | // RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // if (boundedBlockingQueue.isExists() && destroy) { |
| | | // destroyQueue(queueName); |
| | | // } |
| | | // return boundedBlockingQueue.trySetCapacity(capacity); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 添加有界队列数据 |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // * @param data 数据 |
| | | // * @return 添加成功 true 已达到界限 false |
| | | // */ |
| | | // public static <T> boolean addBoundedQueueObject(String queueName, T data) { |
| | | // RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // return boundedBlockingQueue.offer(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列) |
| | | // * |
| | | // * @param queueName 队列名 |
| | | // */ |
| | | // public static <T> T getBoundedQueueObject(String queueName) { |
| | | // RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // return queue.poll(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 有界队列删除队列数据(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean removeBoundedQueueObject(String queueName, T data) { |
| | | // RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // return queue.remove(data); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列) |
| | | // */ |
| | | // public static <T> boolean destroyBoundedQueue(String queueName) { |
| | | // RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | // return queue.delete(); |
| | | // } |
| | | // |
| | | // /** |
| | | // * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等) |
| | | // */ |
| | | // public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) { |
| | | // RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | // queue.subscribeOnElements(consumer); |
| | | // } |
| | | // |
| | | //} |