package cn.lili.trigger; import cn.hutool.json.JSONUtil; import cn.lili.cache.Cache; import cn.lili.common.utils.ThreadPoolUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.core.DefaultTypedTuple; import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.concurrent.TimeUnit; /** * 延时队列工厂 * * @author paulG * @since 2020/11/7 **/ @Slf4j public abstract class AbstractDelayQueueListen implements ApplicationRunner { @Autowired private Cache cache; /** * 延时队列机器开始运作 */ private void startDelayQueueMachine() { log.info("延时队列机器{}开始运作", setDelayQueueName()); //监听redis队列 while (true) { try { //获取当前时间的时间戳 long now = System.currentTimeMillis() / 1000; //获取当前时间前需要执行的任务列表 Set tuples = cache.zRangeByScore(setDelayQueueName(), 0, now); //如果任务不为空 if (!CollectionUtils.isEmpty(tuples)) { log.info("执行任务:{}", JSONUtil.toJsonStr(tuples)); for (DefaultTypedTuple tuple : tuples) { String jobId = (String) tuple.getValue(); //移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务 Long num = cache.zRemove(setDelayQueueName(), jobId); //如果移除成功, 则执行 if (num > 0) { ThreadPoolUtil.execute(() -> invoke(jobId)); } } } } catch (Exception e) { log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e); } finally { //间隔一秒钟搞一次 try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 最终执行的任务方法 * * @param jobId 任务id */ public abstract void invoke(String jobId); /** * 要实现延时队列的名字 * @return 促销延时队列名称 */ public abstract String setDelayQueueName(); /** * 监听队列 */ public void init() { ThreadPoolUtil.getPool().execute(this::startDelayQueueMachine); } }