src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java
@@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.annotation.DbType; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -20,6 +21,8 @@ // 配置分页插件 interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); // 配置乐观锁 interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; } } src/main/java/com/ycl/jxkg/domain/entity/Exam.java
@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; import com.baomidou.mybatisplus.annotation.Version; import com.ycl.jxkg.domain.base.AbsEntity; import com.ycl.jxkg.enums.general.ExamStatusEnum; import lombok.Data; @@ -59,4 +60,7 @@ @TableField(value = "create_time", fill = FieldFill.INSERT) private Date createTime; @Version private Integer updateVersion; } src/main/java/com/ycl/jxkg/job/ExamJob.java
@@ -23,7 +23,7 @@ private final ExamMapper examMapper; @Scheduled(fixedRate = 120000) // 两分钟执行一次,定时任务作为mq消费失败的保底 @Scheduled(fixedRate = 1200000) // 两分钟执行一次,定时任务作为mq消费失败的保底 private void updateExamStatus() { List<Exam> notFinishedExams = new LambdaQueryChainWrapper<>(examMapper) .select(Exam::getId, Exam::getStatus, Exam::getStartTime, Exam::getEndTime) src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java
@@ -25,6 +25,11 @@ @Autowired private ConnectionFactory connectionFactory; @Bean public DirectExchange examExchange() { return new DirectExchange("examExchange"); } // 创建普通队列 @Bean public Queue examQueue() { @@ -34,6 +39,12 @@ return new Queue("jxkg", true, false, false, args); } // 普通信队列到交换机 @Bean public Binding binding(Queue examQueue, DirectExchange examExchange) { return BindingBuilder.bind(examQueue).to(examExchange).with("exam"); } // 创建死信交换机 @Bean public DirectExchange dlxExchange() { src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java
@@ -1,14 +1,27 @@ package com.ycl.jxkg.rabbitmq.consumer; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.ycl.jxkg.domain.entity.Exam; import com.ycl.jxkg.enums.general.ExamStatusEnum; import com.ycl.jxkg.mapper.ExamMapper; import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; import java.util.Objects; /** * @author:xp * @date:2024/6/28 11:51 */ @Component @RequiredArgsConstructor public class Consumer { private final ExamMapper examMapper; /** * 考试状态更新的消费者 @@ -16,8 +29,17 @@ * @param message */ @RabbitListener(queues = "examDlxQueue") public void examStatusAdjustConsumer(String message){ System.out.println(message); public void examStatusAdjustConsumer(Message message){ ExamStatusMsg examStatusMsg = JSON.parseObject(message.getBody(), ExamStatusMsg.class); Exam exam = examMapper.selectById(examStatusMsg.getExamId()); if (Objects.nonNull(exam) && examStatusMsg.getVersion().equals(exam.getUpdateVersion())) { // 不使用updateById这种方式,避免乐观锁加一。 // 因为考试的状态修改一共需要发送两条消息。不能因为第一条消息改了之后,第二条消息因为乐观锁加一了而无法修改 new LambdaUpdateChainWrapper<>(examMapper) .eq(Exam::getId, exam.getId()) .set(Exam::getStatus, examStatusMsg.getTargetStatus()) .update(); } } @@ -27,7 +49,7 @@ * @param message */ @RabbitListener(queues = "meetDlxQueue") public void meetConsumer(String message){ public void meetConsumer(Message message){ System.out.println(message); } } src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java
New file @@ -0,0 +1,25 @@ package com.ycl.jxkg.rabbitmq.msg; import com.ycl.jxkg.enums.general.ExamStatusEnum; import lombok.Data; /** * @author:xp * @date:2024/7/1 14:15 */ @Data public class ExamStatusMsg extends MqMsg { /** * 考试ID * */ private Integer examId; /** * 这条消息希望将考试状态修改为哪个 * */ private ExamStatusEnum targetStatus; } src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java
New file @@ -0,0 +1,18 @@ package com.ycl.jxkg.rabbitmq.msg; import lombok.Data; /** * @author:xp * @date:2024/7/1 13:53 */ @Data public abstract class MqMsg { /** * 乐观锁版本号 * */ private Integer version; } src/main/java/com/ycl/jxkg/rabbitmq/product/Producer.java
New file @@ -0,0 +1,44 @@ package com.ycl.jxkg.rabbitmq.product; import com.ycl.jxkg.rabbitmq.MyCorrelationData; import com.ycl.jxkg.rabbitmq.RabbitMqMsgTypeEnum; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** * @author:xp * @date:2024/6/28 11:18 */ @Component @RequiredArgsConstructor public class Producer { private final RabbitTemplate rabbitTemplate; /** * 发送考试消息 * * @param examId 考试ID * @param jsonMsg 消息json * @param delayTime 延迟时间,毫秒 */ public void examMsg(Integer examId, String jsonMsg, Long delayTime) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(delayTime + ""); Message message = MessageBuilder.withBody(jsonMsg.getBytes()).andProperties(messageProperties).build(); // 设置消息的关联数据,以便发送确认回调、未路由成功消息的处理 MyCorrelationData msgCorrelationData = new MyCorrelationData(examId + "", RabbitMqMsgTypeEnum.EXAM); rabbitTemplate.convertAndSend("examExchange", "exam", message, msgCorrelationData); } public void meetMsg(Integer examId, String jsonMsg, Long delayTime) { rabbitTemplate.convertAndSend("examExchange","meet", "你好,RabbitMQ"); } } src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java
File was deleted src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java
@@ -29,10 +29,12 @@ import com.ycl.jxkg.enums.general.ExamStatusEnum; import com.ycl.jxkg.enums.general.ExamSubmitTempStatusEnum; import com.ycl.jxkg.mapper.*; import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; import com.ycl.jxkg.rabbitmq.product.Producer; import com.ycl.jxkg.server.WebsocketServer; import com.ycl.jxkg.service.ExamPaperScoreService; import com.ycl.jxkg.service.ExamPaperService; import com.ycl.jxkg.service.ExamService; import com.ycl.jxkg.utils.DateTimeUtil; import com.ycl.jxkg.utils.PageUtil; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; @@ -56,6 +58,8 @@ @RequiredArgsConstructor public class ExamServiceImpl extends ServiceImpl<ExamMapper, Exam> implements ExamService { private static final String ANSWER_SPLIT = ","; private final ExamMapper examMapper; private final WebContext webContext; private final QuestionMapper questionMapper; @@ -69,6 +73,8 @@ private final ExamPaperScoreService examPaperScoreService; private static final String ANSWER_SPLIT = ","; private final Producer producer; /** * 添加 * @@ -80,7 +86,11 @@ Exam entity = ExamForm.getEntityByForm(form, null); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); entity.setTeacherId(webContext.getCurrentUser().getId()); baseMapper.insert(entity); // 设置乐观锁版本 entity.setUpdateVersion(0); if (baseMapper.insert(entity) > 0) { this.sendMQ(entity, 0); } return Result.ok("添加成功"); } @@ -101,8 +111,56 @@ } BeanUtils.copyProperties(form, entity); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); baseMapper.updateById(entity); // 如果修改成功发送mq消息 if (baseMapper.updateById(entity) > 0) { this.sendMQ(entity, entity.getUpdateVersion() + 1); } return Result.ok("修改成功"); } /** * 发送mq消息 * * @param entity 考试实体类 * @param version 乐观锁版本 */ public void sendMQ(Exam entity, Integer version) { // 如果当前状态为未开始,则发送两条mq消息,一条设置状态为进行中,一条设置状态为已结束 if (ExamStatusEnum.NOT_START.equals(entity.getStatus())) { // 进行状态消息 ExamStatusMsg ingMsg = new ExamStatusMsg(); ingMsg.setVersion(version); ingMsg.setExamId(entity.getId()); ingMsg.setTargetStatus(ExamStatusEnum.ING); producer.examMsg(entity.getId(), JSON.toJSONString(ingMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getStartTime(), new Date())); // 结束状态消息 ExamStatusMsg finishedMsg = new ExamStatusMsg(); finishedMsg.setVersion(version); finishedMsg.setExamId(entity.getId()); finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); producer.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); } else if (ExamStatusEnum.ING.equals(entity.getStatus())) { // 当前是进行中状态则只需发送结束消息 // 结束状态消息 ExamStatusMsg finishedMsg = new ExamStatusMsg(); finishedMsg.setVersion(0); finishedMsg.setExamId(entity.getId()); finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); producer.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); } } /** * 根据考试的当前状态,得到下一个状态 * * @param currentStatus * @return */ public ExamStatusEnum getNextStatus(ExamStatusEnum currentStatus) { if (ExamStatusEnum.NOT_START.equals(currentStatus)) { return ExamStatusEnum.ING; } else { return ExamStatusEnum.FINISHED; } } /** @@ -411,8 +469,10 @@ // 阅卷后才往exam_paper_answer保存考试成绩、以及保存到exam_paper_customer_answer // 现在只需要保存到一张临时表 // 该接口是主动提交,所以状态都设置为完成,以便后续老师阅卷 //TODO:暂时改为temp方便测试 saveTempExam(submitData, ExamSubmitTempStatusEnum.temp); saveTempExam(submitData, ExamSubmitTempStatusEnum.finish); //TODO:考试状态设定为结束 return Result.ok(); } src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java
@@ -153,4 +153,15 @@ } return list; } /** * 计算两个时间之间的毫秒 * * @param bigDate 大日期 * @param smallDate 小日期 * @return */ public static Long getTwoTimeDiffMS(Date bigDate, Date smallDate) { return bigDate.getTime() - smallDate.getTime(); } } src/main/resources/application-dev.yml
@@ -19,14 +19,14 @@ port: 5672 publisher-confirms: true # 开启生产者发送确认 publisher-returns: true # 开启生产者发送失败退回 listener: simple: default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 acknowledge-mode: manual retry: enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 max-attempts: 3 # 最大重试次数 initial-interval: 3000 # 初次重试等待间隔 multiplier: 2 # 重试失败后,下次等待时间增加多少倍。 # listener: # simple: # default-requeue-rejected: false # 关闭默认拒绝消费时的重新入队,我们使用本地重试消费 # # 确认模式:手动,开启了就必须在代码中手动确认,否则消息会一直重复消费。 # # 开启了重试就应该设置为自动确认,因为手动确认需要捕获异常,而重试就是发生异常才会重试 # acknowledge-mode: auto # retry: # enabled: true # 消费时出现异常进行重试消费,注意不能被捕获,否则无法重试 # max-attempts: 3 # 最大重试次数 # initial-interval: 3000 # 初次重试等待间隔 # multiplier: 2 # 重试失败后,下次等待时间增加多少倍。