zxl
2025-06-11 05e49980cd556d24239e00d028dc5efa25e0de14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package cn.lili.trigger;
 
import cn.hutool.json.JSONUtil;
import cn.lili.cache.Cache;
import cn.lili.common.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;
 
import java.util.Set;
import java.util.concurrent.TimeUnit;
 
/**
 * 延时队列工厂
 *
 * @author paulG
 * @since 2020/11/7
 **/
@Slf4j
public abstract class AbstractDelayQueueListen implements ApplicationRunner {
 
    @Autowired
    private Cache cache;
 
 
    /**
     * 延时队列机器开始运作
     */
    private void startDelayQueueMachine() {
        log.info("延时队列机器{}开始运作", setDelayQueueName());
 
        //监听redis队列
        while (true) {
            try {
                //获取当前时间的时间戳
                long now = System.currentTimeMillis() / 1000;
                //获取当前时间前需要执行的任务列表
                Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
 
                //如果任务不为空
                if (!CollectionUtils.isEmpty(tuples)) {
                    log.info("执行任务:{}", JSONUtil.toJsonStr(tuples));
 
                    for (DefaultTypedTuple tuple : tuples) {
                        String jobId = (String) tuple.getValue();
                        //移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务
                        Long num = cache.zRemove(setDelayQueueName(), jobId);
                        //如果移除成功, 则执行
                        if (num > 0) {
                            ThreadPoolUtil.execute(() -> invoke(jobId));
                        }
                    }
                }
 
            } catch (Exception e) {
                log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
            } finally {
                //间隔一秒钟搞一次
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
            }
        }
 
    }
 
    /**
     * 最终执行的任务方法
     *
     * @param jobId 任务id
     */
    public abstract void invoke(String jobId);
 
 
    /**
     * 要实现延时队列的名字
     * @return 促销延时队列名称
     */
    public abstract String setDelayQueueName();
 
 
    /**
     * 监听队列
     */
    public void init() {
        ThreadPoolUtil.getPool().execute(this::startDelayQueueMachine);
    }
 
}