xiangpei
2024-06-28 d0ebd33f5e4fdbff1f58a1af137d45b405366399
rabbitmq
3个文件已修改
5个文件已添加
247 ■■■■■ 已修改文件
pom.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ycl/jxkg/rabbitmq/MyCorrelationData.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMqMsgTypeEnum.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-prod.yml 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml
@@ -43,6 +43,12 @@
    <dependencies>
        <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
src/main/java/com/ycl/jxkg/rabbitmq/MyCorrelationData.java
New file
@@ -0,0 +1,27 @@
package com.ycl.jxkg.rabbitmq;
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CorrelationData;
/**
 * 扩展关联数据
 *
 * @author xp
 * @data 2023/11/11
 */
@Data
public class MyCorrelationData extends CorrelationData {
    /**
     * 消息类型,可用于业务处理
     */
    private RabbitMqMsgTypeEnum msgType;
    public MyCorrelationData(String id, RabbitMqMsgTypeEnum msgType) {
        super(id);
        this.msgType = msgType;
    }
}
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java
New file
@@ -0,0 +1,91 @@
package com.ycl.jxkg.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author:xp
 * @date:2024/6/28 13:39
 */
@Slf4j
@Configuration
public class RabbitMQConfig {
    @Autowired
    private ConnectionFactory connectionFactory;
    // 创建普通队列
    @Bean
    public Queue examQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置死信交换机
        args.put("x-dead-letter-exchange", "dlxExchange");
        return new Queue("jxkg", true, false, false, args);
    }
    // 创建死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlxExchange");
    }
    // 创建考试使用的死信队列
    @Bean
    public Queue examDlxQueue() {
        return new Queue("examDlxQueue", true);
    }
    // 创建会议使用的死信队列
    @Bean
    public Queue meetDlxQueue() {
        return new Queue("meetDlxQueue", true);
    }
     // 绑定考试死信队列到死信交换机
     @Bean
     public Binding examDlxBinding(Queue examDlxQueue, DirectExchange dlxExchange) {
         return BindingBuilder.bind(examDlxQueue).to(dlxExchange).with("exam");
     }
    // 绑定会议死信队列到死信交换机
    @Bean
    public Binding meetDlxBinding(Queue meetDlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(meetDlxQueue).to(dlxExchange).with("meet");
    }
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置回退回调,只能全局设置一个
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息发送失败,目标交换机:{},路由:{},错误信息:{}", exchange, routingKey, replyText);
        });
        // 设置确认回调,全局设置一个,根据关联数据(CorrelationData)进行对应处理
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                MyCorrelationData myCorrelationData = (MyCorrelationData) correlationData;
                System.out.println(myCorrelationData);
                // 成功
                log.info("消息成功发送到交换机");
            } else {
                // 失败
                // todo 根据消息不同,处理不同
                log.error("消息发送失败");
            }
        });
        return rabbitTemplate;
    }
}
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMqMsgTypeEnum.java
New file
@@ -0,0 +1,19 @@
package com.ycl.jxkg.rabbitmq;
import lombok.Getter;
/**
 * mq消息类型
 *
 * @author xp
 * @data 2023/11/11
 */
@Getter
public enum RabbitMqMsgTypeEnum {
    EXAM,
    MEET
    ;
}
src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java
New file
@@ -0,0 +1,33 @@
package com.ycl.jxkg.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
 * @author:xp
 * @date:2024/6/28 11:51
 */
@Component
public class Consumer {
    /**
     * 考试状态更新的消费者
     *
     * @param message
     */
    @RabbitListener(queues = "examDlxQueue")
    public void examStatusAdjustConsumer(String message){
        System.out.println(message);
    }
    /**
     * 会议消费者
     *
     * @param message
     */
    @RabbitListener(queues = "meetDlxQueue")
    public void meetConsumer(String message){
        System.out.println(message);
    }
}
src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java
New file
@@ -0,0 +1,33 @@
package com.ycl.jxkg.rabbitmq.product;
import com.ycl.jxkg.rabbitmq.MyCorrelationData;
import com.ycl.jxkg.rabbitmq.RabbitMqMsgTypeEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
 * @author:xp
 * @date:2024/6/28 11:18
 */
@Component
@RequiredArgsConstructor
public class Product {
    private final RabbitTemplate rabbitTemplate;
    public void examMsg() {
        // 设置消息的关联数据,以便发送确认回调、未路由成功消息的处理
        MyCorrelationData msgCorrelationData = new MyCorrelationData("ddddd", RabbitMqMsgTypeEnum.EXAM);
        rabbitTemplate.convertAndSend("examDlxQueue", (Object) "你好,RabbitMQ", msgCorrelationData);
    }
    public void meetMsg() {
        rabbitTemplate.convertAndSend("meetDlxQueue","你好,RabbitMQ");
    }
}
src/main/resources/application-dev.yml
@@ -11,3 +11,22 @@
    username: root
    password: 321$YcYl@1970!
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: 101.35.247.188
    username: ycl
    password: ycl@2024
    virtual-host: jxkg
    port: 5672
    publisher-confirms: true  # 开启生产者发送确认
    publisher-returns: true  # 开启生产者发送失败退回
    listener:
      simple:
        default-requeue-rejected: false  # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费
        # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。
        # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试
        acknowledge-mode: manual
        retry:
          enabled: true  # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试
          max-attempts: 3  # 最大重试次数
          initial-interval: 3000  # 初次重试等待间隔
          multiplier: 2 # 重试失败后,下次等待时间增加多少倍。
src/main/resources/application-prod.yml
@@ -11,3 +11,22 @@
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: 101.35.247.188
    username: ycl
    password: ycl@2024
    virtual-host: jxkg
    port: 5672
    publisher-confirms: true  # 开启生产者发送确认
    publisher-returns: true  # 开启生产者发送失败退回
    listener:
      simple:
        default-requeue-rejected: false  # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费
        # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。
        # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试
        acknowledge-mode: manual
        retry:
          enabled: true  # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试
          max-attempts: 3  # 最大重试次数
          initial-interval: 3000  # 初次重试等待间隔
          multiplier: 2 # 重试失败后,下次等待时间增加多少倍。