pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/MyCorrelationData.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMqMsgTypeEnum.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application-dev.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application-prod.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
pom.xml
@@ -43,6 +43,12 @@ <dependencies> <!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> src/main/java/com/ycl/jxkg/rabbitmq/MyCorrelationData.java
New file @@ -0,0 +1,27 @@ package com.ycl.jxkg.rabbitmq; import lombok.Data; import org.springframework.amqp.rabbit.connection.CorrelationData; /** * 扩展关联数据 * * @author xp * @data 2023/11/11 */ @Data public class MyCorrelationData extends CorrelationData { /** * 消息类型,可用于业务处理 */ private RabbitMqMsgTypeEnum msgType; public MyCorrelationData(String id, RabbitMqMsgTypeEnum msgType) { super(id); this.msgType = msgType; } } src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java
New file @@ -0,0 +1,91 @@ package com.ycl.jxkg.rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author:xp * @date:2024/6/28 13:39 */ @Slf4j @Configuration public class RabbitMQConfig { @Autowired private ConnectionFactory connectionFactory; // 创建普通队列 @Bean public Queue examQueue() { Map<String, Object> args = new HashMap<>(); // 设置死信交换机 args.put("x-dead-letter-exchange", "dlxExchange"); return new Queue("jxkg", true, false, false, args); } // 创建死信交换机 @Bean public DirectExchange dlxExchange() { return new DirectExchange("dlxExchange"); } // 创建考试使用的死信队列 @Bean public Queue examDlxQueue() { return new Queue("examDlxQueue", true); } // 创建会议使用的死信队列 @Bean public Queue meetDlxQueue() { return new Queue("meetDlxQueue", true); } // 绑定考试死信队列到死信交换机 @Bean public Binding examDlxBinding(Queue examDlxQueue, DirectExchange dlxExchange) { return BindingBuilder.bind(examDlxQueue).to(dlxExchange).with("exam"); } // 绑定会议死信队列到死信交换机 @Bean public Binding meetDlxBinding(Queue meetDlxQueue, DirectExchange dlxExchange) { return BindingBuilder.bind(meetDlxQueue).to(dlxExchange).with("meet"); } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 设置回退回调,只能全局设置一个 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息发送失败,目标交换机:{},路由:{},错误信息:{}", exchange, routingKey, replyText); }); // 设置确认回调,全局设置一个,根据关联数据(CorrelationData)进行对应处理 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { MyCorrelationData myCorrelationData = (MyCorrelationData) correlationData; System.out.println(myCorrelationData); // 成功 log.info("消息成功发送到交换机"); } else { // 失败 // todo 根据消息不同,处理不同 log.error("消息发送失败"); } }); return rabbitTemplate; } } src/main/java/com/ycl/jxkg/rabbitmq/RabbitMqMsgTypeEnum.java
New file @@ -0,0 +1,19 @@ package com.ycl.jxkg.rabbitmq; import lombok.Getter; /** * mq消息类型 * * @author xp * @data 2023/11/11 */ @Getter public enum RabbitMqMsgTypeEnum { EXAM, MEET ; } src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java
New file @@ -0,0 +1,33 @@ package com.ycl.jxkg.rabbitmq.consumer; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author:xp * @date:2024/6/28 11:51 */ @Component public class Consumer { /** * 考试状态更新的消费者 * * @param message */ @RabbitListener(queues = "examDlxQueue") public void examStatusAdjustConsumer(String message){ System.out.println(message); } /** * 会议消费者 * * @param message */ @RabbitListener(queues = "meetDlxQueue") public void meetConsumer(String message){ System.out.println(message); } } src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java
New file @@ -0,0 +1,33 @@ package com.ycl.jxkg.rabbitmq.product; import com.ycl.jxkg.rabbitmq.MyCorrelationData; import com.ycl.jxkg.rabbitmq.RabbitMqMsgTypeEnum; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** * @author:xp * @date:2024/6/28 11:18 */ @Component @RequiredArgsConstructor public class Product { private final RabbitTemplate rabbitTemplate; public void examMsg() { // 设置消息的关联数据,以便发送确认回调、未路由成功消息的处理 MyCorrelationData msgCorrelationData = new MyCorrelationData("ddddd", RabbitMqMsgTypeEnum.EXAM); rabbitTemplate.convertAndSend("examDlxQueue", (Object) "你好,RabbitMQ", msgCorrelationData); } public void meetMsg() { rabbitTemplate.convertAndSend("meetDlxQueue","你好,RabbitMQ"); } } src/main/resources/application-dev.yml
@@ -11,3 +11,22 @@ username: root password: 321$YcYl@1970! driver-class-name: com.mysql.cj.jdbc.Driver rabbitmq: host: 101.35.247.188 username: ycl password: ycl@2024 virtual-host: jxkg port: 5672 publisher-confirms: true # 开启生产者发送确认 publisher-returns: true # 开启生产者发送失败退回 listener: simple: default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 acknowledge-mode: manual retry: enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 max-attempts: 3 # 最大重试次数 initial-interval: 3000 # 初次重试等待间隔 multiplier: 2 # 重试失败后,下次等待时间增加多少倍。 src/main/resources/application-prod.yml
@@ -11,3 +11,22 @@ username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver rabbitmq: host: 101.35.247.188 username: ycl password: ycl@2024 virtual-host: jxkg port: 5672 publisher-confirms: true # 开启生产者发送确认 publisher-returns: true # 开启生产者发送失败退回 listener: simple: default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 acknowledge-mode: manual retry: enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 max-attempts: 3 # 最大重试次数 initial-interval: 3000 # 初次重试等待间隔 multiplier: 2 # 重试失败后,下次等待时间增加多少倍。