src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/domain/entity/Exam.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/job/ExamJob.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application-dev.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java
@@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.annotation.DbType; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -20,6 +21,8 @@ // 配置分页插件 interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); // 配置乐观锁 interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; } } src/main/java/com/ycl/jxkg/domain/entity/Exam.java
@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; import com.baomidou.mybatisplus.annotation.Version; import com.ycl.jxkg.domain.base.AbsEntity; import com.ycl.jxkg.enums.general.ExamStatusEnum; import lombok.Data; @@ -59,4 +60,7 @@ @TableField(value = "create_time", fill = FieldFill.INSERT) private Date createTime; @Version private Integer updateVersion; } src/main/java/com/ycl/jxkg/job/ExamJob.java
@@ -23,7 +23,7 @@ private final ExamMapper examMapper; @Scheduled(fixedRate = 120000) // 两分钟执行一次,定时任务作为mq消费失败的保底 @Scheduled(fixedRate = 1200000) // 两分钟执行一次,定时任务作为mq消费失败的保底 private void updateExamStatus() { List<Exam> notFinishedExams = new LambdaQueryChainWrapper<>(examMapper) .select(Exam::getId, Exam::getStatus, Exam::getStartTime, Exam::getEndTime) src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java
@@ -25,6 +25,11 @@ @Autowired private ConnectionFactory connectionFactory; @Bean public DirectExchange examExchange() { return new DirectExchange("examExchange"); } // 创建普通队列 @Bean public Queue examQueue() { @@ -34,6 +39,12 @@ return new Queue("jxkg", true, false, false, args); } // 普通信队列到交换机 @Bean public Binding binding(Queue examQueue, DirectExchange examExchange) { return BindingBuilder.bind(examQueue).to(examExchange).with("exam"); } // 创建死信交换机 @Bean public DirectExchange dlxExchange() { src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java
@@ -1,14 +1,27 @@ package com.ycl.jxkg.rabbitmq.consumer; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.ycl.jxkg.domain.entity.Exam; import com.ycl.jxkg.enums.general.ExamStatusEnum; import com.ycl.jxkg.mapper.ExamMapper; import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; import java.util.Objects; /** * @author:xp * @date:2024/6/28 11:51 */ @Component @RequiredArgsConstructor public class Consumer { private final ExamMapper examMapper; /** * 考试状态更新的消费者 @@ -16,8 +29,17 @@ * @param message */ @RabbitListener(queues = "examDlxQueue") public void examStatusAdjustConsumer(String message){ System.out.println(message); public void examStatusAdjustConsumer(Message message){ ExamStatusMsg examStatusMsg = JSON.parseObject(message.getBody(), ExamStatusMsg.class); Exam exam = examMapper.selectById(examStatusMsg.getExamId()); if (Objects.nonNull(exam) && examStatusMsg.getVersion().equals(exam.getUpdateVersion())) { // 不使用updateById这种方式,避免乐观锁加一。 // 因为考试的状态修改一共需要发送两条消息。不能因为第一条消息改了之后,第二条消息因为乐观锁加一了而无法修改 new LambdaUpdateChainWrapper<>(examMapper) .eq(Exam::getId, exam.getId()) .set(Exam::getStatus, examStatusMsg.getTargetStatus()) .update(); } } @@ -27,7 +49,7 @@ * @param message */ @RabbitListener(queues = "meetDlxQueue") public void meetConsumer(String message){ public void meetConsumer(Message message){ System.out.println(message); } } src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java
New file @@ -0,0 +1,25 @@ package com.ycl.jxkg.rabbitmq.msg; import com.ycl.jxkg.enums.general.ExamStatusEnum; import lombok.Data; /** * @author:xp * @date:2024/7/1 14:15 */ @Data public class ExamStatusMsg extends MqMsg { /** * 考试ID * */ private Integer examId; /** * 这条消息希望将考试状态修改为哪个 * */ private ExamStatusEnum targetStatus; } src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java
New file @@ -0,0 +1,18 @@ package com.ycl.jxkg.rabbitmq.msg; import lombok.Data; /** * @author:xp * @date:2024/7/1 13:53 */ @Data public abstract class MqMsg { /** * 乐观锁版本号 * */ private Integer version; } src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java
@@ -4,6 +4,7 @@ import com.ycl.jxkg.rabbitmq.RabbitMqMsgTypeEnum; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @@ -20,10 +21,20 @@ private final RabbitTemplate rabbitTemplate; public void examMsg() { /** * 发送考试消息 * * @param examId 考试ID * @param jsonMsg 消息json * @param delayTime 延迟时间,毫秒 */ public void examMsg(Integer examId, String jsonMsg, Long delayTime) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(delayTime + ""); Message message = MessageBuilder.withBody(jsonMsg.getBytes()).andProperties(messageProperties).build(); // 设置消息的关联数据,以便发送确认回调、未路由成功消息的处理 MyCorrelationData msgCorrelationData = new MyCorrelationData("ddddd", RabbitMqMsgTypeEnum.EXAM); rabbitTemplate.convertAndSend("examDlxQueue", (Object) "你好,RabbitMQ", msgCorrelationData); MyCorrelationData msgCorrelationData = new MyCorrelationData(examId + "", RabbitMqMsgTypeEnum.EXAM); rabbitTemplate.convertAndSend("examExchange", "exam", message, msgCorrelationData); } public void meetMsg() { src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java
@@ -27,9 +27,12 @@ import com.ycl.jxkg.enums.general.ExamStatusEnum; import com.ycl.jxkg.enums.general.ExamSubmitTempStatusEnum; import com.ycl.jxkg.mapper.*; import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; import com.ycl.jxkg.rabbitmq.product.Product; import com.ycl.jxkg.server.WebsocketServer; import com.ycl.jxkg.service.ExamPaperService; import com.ycl.jxkg.service.ExamService; import com.ycl.jxkg.utils.DateTimeUtil; import com.ycl.jxkg.utils.PageUtil; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; @@ -63,7 +66,7 @@ private final WebsocketServer websocketServer; private final UserMapper userMapper; private final ExamPaperScoreMapper examPaperScoreMapper; private final ExamPaperScoreDetailMapper examPaperScoreDetailMapper; private final Product product; /** * 添加 @@ -76,7 +79,11 @@ Exam entity = ExamForm.getEntityByForm(form, null); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); entity.setTeacherId(webContext.getCurrentUser().getId()); baseMapper.insert(entity); // 设置乐观锁版本 entity.setUpdateVersion(0); if (baseMapper.insert(entity) > 0) { this.sendMQ(entity, 0); } return Result.ok("添加成功"); } @@ -97,11 +104,59 @@ } BeanUtils.copyProperties(form, entity); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); baseMapper.updateById(entity); // 如果修改成功发送mq消息 if (baseMapper.updateById(entity) > 0) { this.sendMQ(entity, entity.getUpdateVersion() + 1); } return Result.ok("修改成功"); } /** * 发送mq消息 * * @param entity 考试实体类 * @param version 乐观锁版本 */ public void sendMQ(Exam entity, Integer version) { // 如果当前状态为未开始,则发送两条mq消息,一条设置状态为进行中,一条设置状态为已结束 if (ExamStatusEnum.NOT_START.equals(entity.getStatus())) { // 进行状态消息 ExamStatusMsg ingMsg = new ExamStatusMsg(); ingMsg.setVersion(version); ingMsg.setExamId(entity.getId()); ingMsg.setTargetStatus(ExamStatusEnum.ING); product.examMsg(entity.getId(), JSON.toJSONString(ingMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getStartTime(), new Date())); // 结束状态消息 ExamStatusMsg finishedMsg = new ExamStatusMsg(); finishedMsg.setVersion(version); finishedMsg.setExamId(entity.getId()); finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); product.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); } else if (ExamStatusEnum.ING.equals(entity.getStatus())) { // 当前是进行中状态则只需发送结束消息 // 结束状态消息 ExamStatusMsg finishedMsg = new ExamStatusMsg(); finishedMsg.setVersion(0); finishedMsg.setExamId(entity.getId()); finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); product.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); } } /** * 根据考试的当前状态,得到下一个状态 * * @param currentStatus * @return */ public ExamStatusEnum getNextStatus(ExamStatusEnum currentStatus) { if (ExamStatusEnum.NOT_START.equals(currentStatus)) { return ExamStatusEnum.ING; } else { return ExamStatusEnum.FINISHED; } } /** * 批量删除 * * @param ids src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java
@@ -153,4 +153,15 @@ } return list; } /** * 计算两个时间之间的毫秒 * * @param bigDate 大日期 * @param smallDate 小日期 * @return */ public static Long getTwoTimeDiffMS(Date bigDate, Date smallDate) { return bigDate.getTime() - smallDate.getTime(); } } src/main/resources/application-dev.yml
@@ -19,14 +19,14 @@ port: 5672 publisher-confirms: true # 开启生产者发送确认 publisher-returns: true # 开启生产者发送失败退回 listener: simple: default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 acknowledge-mode: manual retry: enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 max-attempts: 3 # 最大重试次数 initial-interval: 3000 # 初次重试等待间隔 multiplier: 2 # 重试失败后,下次等待时间增加多少倍。 # listener: # simple: # default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 # acknowledge-mode: auto # retry: # enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 # max-attempts: 3 # 最大重试次数 # initial-interval: 3000 # 初次重试等待间隔 # multiplier: 2 # 重试失败后,下次等待时间增加多少倍。