fuliqi
2024-07-01 a745c3b358b97b468711a0ecac11dbe2d5d24018
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.ycl.jxkg.rabbitmq;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * @author:xp
 * @date:2024/6/28 13:39
 */
@Slf4j
@Configuration
public class RabbitMQConfig {
 
    @Autowired
    private ConnectionFactory connectionFactory;
 
    @Bean
    public DirectExchange examExchange() {
        return new DirectExchange("examExchange");
    }
 
    // 创建普通队列
    @Bean
    public Queue examQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", "dlxExchange");
        return new Queue("jxkg", true, false, false, args);
    }
 
    // 普通信队列到交换机
    @Bean
    public Binding binding(Queue examQueue, DirectExchange examExchange) {
        return BindingBuilder.bind(examQueue).to(examExchange).with("exam");
    }
 
    // 创建死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlxExchange");
    }
 
    // 创建考试使用的死信队列
    @Bean
    public Queue examDlxQueue() {
        return new Queue("examDlxQueue", true);
    }
 
    // 创建会议使用的死信队列
    @Bean
    public Queue meetDlxQueue() {
        return new Queue("meetDlxQueue", true);
    }
 
     // 绑定考试死信队列到死信交换机
     @Bean
     public Binding examDlxBinding(Queue examDlxQueue, DirectExchange dlxExchange) {
         return BindingBuilder.bind(examDlxQueue).to(dlxExchange).with("exam");
     }
 
    // 绑定会议死信队列到死信交换机
    @Bean
    public Binding meetDlxBinding(Queue meetDlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(meetDlxQueue).to(dlxExchange).with("meet");
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
 
        // 设置回退回调,只能全局设置一个
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息发送失败,目标交换机:{},路由:{},错误信息:{}", exchange, routingKey, replyText);
        });
        // 设置确认回调,全局设置一个,根据关联数据(CorrelationData)进行对应处理
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                MyCorrelationData myCorrelationData = (MyCorrelationData) correlationData;
                System.out.println(myCorrelationData);
                // 成功
                log.info("消息成功发送到交换机");
            } else {
                // 失败
                // todo 根据消息不同,处理不同
                log.error("消息发送失败");
            }
        });
        return rabbitTemplate;
    }
 
}