From 84b63179040166b3ab06419c4d45e7cec64c9440 Mon Sep 17 00:00:00 2001 From: xiangpei <xiangpei@timesnew.cn> Date: 星期一, 01 七月 2024 17:14:26 +0800 Subject: [PATCH] mq消息 --- src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java | 3 src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java | 11 ++ src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java | 25 ++++++ src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java | 18 ++++ src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java | 17 +++ src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java | 11 ++ src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java | 28 ++++++ src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java | 61 ++++++++++++++ src/main/resources/application-dev.yml | 22 ++-- src/main/java/com/ycl/jxkg/domain/entity/Exam.java | 4 + src/main/java/com/ycl/jxkg/job/ExamJob.java | 2 11 files changed, 181 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java b/src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java index 2a4adcd..fdd2d9e 100644 --- a/src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java +++ b/src/main/java/com/ycl/jxkg/config/mybatisplus/MybatisPlusConfig.java @@ -2,6 +2,7 @@ import com.baomidou.mybatisplus.annotation.DbType; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; +import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -20,6 +21,8 @@ // 閰嶇疆鍒嗛〉鎻掍欢 interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); + // 閰嶇疆涔愯閿� + interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; } } diff --git a/src/main/java/com/ycl/jxkg/domain/entity/Exam.java b/src/main/java/com/ycl/jxkg/domain/entity/Exam.java index 54e6def..eadd953 100644 --- a/src/main/java/com/ycl/jxkg/domain/entity/Exam.java +++ b/src/main/java/com/ycl/jxkg/domain/entity/Exam.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; +import com.baomidou.mybatisplus.annotation.Version; import com.ycl.jxkg.domain.base.AbsEntity; import com.ycl.jxkg.enums.general.ExamStatusEnum; import lombok.Data; @@ -59,4 +60,7 @@ @TableField(value = "create_time", fill = FieldFill.INSERT) private Date createTime; + + @Version + private Integer updateVersion; } diff --git a/src/main/java/com/ycl/jxkg/job/ExamJob.java b/src/main/java/com/ycl/jxkg/job/ExamJob.java index 2f97502..838054a 100644 --- a/src/main/java/com/ycl/jxkg/job/ExamJob.java +++ b/src/main/java/com/ycl/jxkg/job/ExamJob.java @@ -23,7 +23,7 @@ private final ExamMapper examMapper; - @Scheduled(fixedRate = 120000) // 涓ゅ垎閽熸墽琛屼竴娆★紝瀹氭椂浠诲姟浣滀负mq娑堣垂澶辫触鐨勪繚搴� + @Scheduled(fixedRate = 1200000) // 涓ゅ垎閽熸墽琛屼竴娆★紝瀹氭椂浠诲姟浣滀负mq娑堣垂澶辫触鐨勪繚搴� private void updateExamStatus() { List<Exam> notFinishedExams = new LambdaQueryChainWrapper<>(examMapper) .select(Exam::getId, Exam::getStatus, Exam::getStartTime, Exam::getEndTime) diff --git a/src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java b/src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java index 91bf54c..47ff879 100644 --- a/src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java +++ b/src/main/java/com/ycl/jxkg/rabbitmq/RabbitMQConfig.java @@ -25,6 +25,11 @@ @Autowired private ConnectionFactory connectionFactory; + @Bean + public DirectExchange examExchange() { + return new DirectExchange("examExchange"); + } + // 鍒涘缓鏅�氶槦鍒� @Bean public Queue examQueue() { @@ -34,6 +39,12 @@ return new Queue("jxkg", true, false, false, args); } + // 鏅�氫俊闃熷垪鍒颁氦鎹㈡満 + @Bean + public Binding binding(Queue examQueue, DirectExchange examExchange) { + return BindingBuilder.bind(examQueue).to(examExchange).with("exam"); + } + // 鍒涘缓姝讳俊浜ゆ崲鏈� @Bean public DirectExchange dlxExchange() { diff --git a/src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java b/src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java index d99cb71..7dee25c 100644 --- a/src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java +++ b/src/main/java/com/ycl/jxkg/rabbitmq/consumer/Consumer.java @@ -1,14 +1,27 @@ package com.ycl.jxkg.rabbitmq.consumer; +import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; +import com.ycl.jxkg.domain.entity.Exam; +import com.ycl.jxkg.enums.general.ExamStatusEnum; +import com.ycl.jxkg.mapper.ExamMapper; +import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; + +import java.util.Objects; /** * @author锛歺p * @date锛�2024/6/28 11:51 */ @Component +@RequiredArgsConstructor public class Consumer { + + private final ExamMapper examMapper; /** * 鑰冭瘯鐘舵�佹洿鏂扮殑娑堣垂鑰� @@ -16,8 +29,17 @@ * @param message */ @RabbitListener(queues = "examDlxQueue") - public void examStatusAdjustConsumer(String message){ - System.out.println(message); + public void examStatusAdjustConsumer(Message message){ + ExamStatusMsg examStatusMsg = JSON.parseObject(message.getBody(), ExamStatusMsg.class); + Exam exam = examMapper.selectById(examStatusMsg.getExamId()); + if (Objects.nonNull(exam) && examStatusMsg.getVersion().equals(exam.getUpdateVersion())) { + // 涓嶄娇鐢╱pdateById杩欑鏂瑰紡锛岄伩鍏嶄箰瑙傞攣鍔犱竴銆� + // 鍥犱负鑰冭瘯鐨勭姸鎬佷慨鏀逛竴鍏遍渶瑕佸彂閫佷袱鏉℃秷鎭�備笉鑳藉洜涓虹涓�鏉℃秷鎭敼浜嗕箣鍚庯紝绗簩鏉℃秷鎭洜涓轰箰瑙傞攣鍔犱竴浜嗚�屾棤娉曚慨鏀� + new LambdaUpdateChainWrapper<>(examMapper) + .eq(Exam::getId, exam.getId()) + .set(Exam::getStatus, examStatusMsg.getTargetStatus()) + .update(); + } } @@ -27,7 +49,7 @@ * @param message */ @RabbitListener(queues = "meetDlxQueue") - public void meetConsumer(String message){ + public void meetConsumer(Message message){ System.out.println(message); } } diff --git a/src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java b/src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java new file mode 100644 index 0000000..c10d0ef --- /dev/null +++ b/src/main/java/com/ycl/jxkg/rabbitmq/msg/ExamStatusMsg.java @@ -0,0 +1,25 @@ +package com.ycl.jxkg.rabbitmq.msg; + +import com.ycl.jxkg.enums.general.ExamStatusEnum; +import lombok.Data; + +/** + * @author锛歺p + * @date锛�2024/7/1 14:15 + */ +@Data +public class ExamStatusMsg extends MqMsg { + + /** + * 鑰冭瘯ID + * + */ + private Integer examId; + + /** + * 杩欐潯娑堟伅甯屾湜灏嗚�冭瘯鐘舵�佷慨鏀逛负鍝釜 + * + */ + private ExamStatusEnum targetStatus; + +} diff --git a/src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java b/src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java new file mode 100644 index 0000000..7821bfc --- /dev/null +++ b/src/main/java/com/ycl/jxkg/rabbitmq/msg/MqMsg.java @@ -0,0 +1,18 @@ +package com.ycl.jxkg.rabbitmq.msg; + +import lombok.Data; + +/** + * @author锛歺p + * @date锛�2024/7/1 13:53 + */ +@Data +public abstract class MqMsg { + + /** + * 涔愯閿佺増鏈彿 + * + */ + private Integer version; + +} diff --git a/src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java b/src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java index e3585e8..587b9a0 100644 --- a/src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java +++ b/src/main/java/com/ycl/jxkg/rabbitmq/product/Product.java @@ -4,6 +4,7 @@ import com.ycl.jxkg.rabbitmq.RabbitMqMsgTypeEnum; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @@ -20,10 +21,20 @@ private final RabbitTemplate rabbitTemplate; - public void examMsg() { + /** + * 鍙戦�佽�冭瘯娑堟伅 + * + * @param examId 鑰冭瘯ID + * @param jsonMsg 娑堟伅json + * @param delayTime 寤惰繜鏃堕棿锛屾绉� + */ + public void examMsg(Integer examId, String jsonMsg, Long delayTime) { + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setExpiration(delayTime + ""); + Message message = MessageBuilder.withBody(jsonMsg.getBytes()).andProperties(messageProperties).build(); // 璁剧疆娑堟伅鐨勫叧鑱旀暟鎹紝浠ヤ究鍙戦�佺‘璁ゅ洖璋冦�佹湭璺敱鎴愬姛娑堟伅鐨勫鐞� - MyCorrelationData msgCorrelationData = new MyCorrelationData("ddddd", RabbitMqMsgTypeEnum.EXAM); - rabbitTemplate.convertAndSend("examDlxQueue", (Object) "浣犲ソ锛孯abbitMQ", msgCorrelationData); + MyCorrelationData msgCorrelationData = new MyCorrelationData(examId + "", RabbitMqMsgTypeEnum.EXAM); + rabbitTemplate.convertAndSend("examExchange", "exam", message, msgCorrelationData); } public void meetMsg() { diff --git a/src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java b/src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java index f879d5a..a4727ff 100644 --- a/src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java +++ b/src/main/java/com/ycl/jxkg/service/impl/ExamServiceImpl.java @@ -27,9 +27,12 @@ import com.ycl.jxkg.enums.general.ExamStatusEnum; import com.ycl.jxkg.enums.general.ExamSubmitTempStatusEnum; import com.ycl.jxkg.mapper.*; +import com.ycl.jxkg.rabbitmq.msg.ExamStatusMsg; +import com.ycl.jxkg.rabbitmq.product.Product; import com.ycl.jxkg.server.WebsocketServer; import com.ycl.jxkg.service.ExamPaperService; import com.ycl.jxkg.service.ExamService; +import com.ycl.jxkg.utils.DateTimeUtil; import com.ycl.jxkg.utils.PageUtil; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; @@ -63,7 +66,7 @@ private final WebsocketServer websocketServer; private final UserMapper userMapper; private final ExamPaperScoreMapper examPaperScoreMapper; - private final ExamPaperScoreDetailMapper examPaperScoreDetailMapper; + private final Product product; /** * 娣诲姞 @@ -76,7 +79,11 @@ Exam entity = ExamForm.getEntityByForm(form, null); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); entity.setTeacherId(webContext.getCurrentUser().getId()); - baseMapper.insert(entity); + // 璁剧疆涔愯閿佺増鏈� + entity.setUpdateVersion(0); + if (baseMapper.insert(entity) > 0) { + this.sendMQ(entity, 0); + } return Result.ok("娣诲姞鎴愬姛"); } @@ -97,11 +104,59 @@ } BeanUtils.copyProperties(form, entity); entity.setStatus(ExamStatusEnum.getStatusByTime(form.getStartTime(), form.getEndTime(), null)); - baseMapper.updateById(entity); + // 濡傛灉淇敼鎴愬姛鍙戦�乵q娑堟伅 + if (baseMapper.updateById(entity) > 0) { + this.sendMQ(entity, entity.getUpdateVersion() + 1); + } return Result.ok("淇敼鎴愬姛"); } /** + * 鍙戦�乵q娑堟伅 + * + * @param entity 鑰冭瘯瀹炰綋绫� + * @param version 涔愯閿佺増鏈� + */ + public void sendMQ(Exam entity, Integer version) { + // 濡傛灉褰撳墠鐘舵�佷负鏈紑濮嬶紝鍒欏彂閫佷袱鏉q娑堟伅锛屼竴鏉¤缃姸鎬佷负杩涜涓紝涓�鏉¤缃姸鎬佷负宸茬粨鏉� + if (ExamStatusEnum.NOT_START.equals(entity.getStatus())) { + // 杩涜鐘舵�佹秷鎭� + ExamStatusMsg ingMsg = new ExamStatusMsg(); + ingMsg.setVersion(version); + ingMsg.setExamId(entity.getId()); + ingMsg.setTargetStatus(ExamStatusEnum.ING); + product.examMsg(entity.getId(), JSON.toJSONString(ingMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getStartTime(), new Date())); + // 缁撴潫鐘舵�佹秷鎭� + ExamStatusMsg finishedMsg = new ExamStatusMsg(); + finishedMsg.setVersion(version); + finishedMsg.setExamId(entity.getId()); + finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); + product.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); + } else if (ExamStatusEnum.ING.equals(entity.getStatus())) { // 褰撳墠鏄繘琛屼腑鐘舵�佸垯鍙渶鍙戦�佺粨鏉熸秷鎭� + // 缁撴潫鐘舵�佹秷鎭� + ExamStatusMsg finishedMsg = new ExamStatusMsg(); + finishedMsg.setVersion(0); + finishedMsg.setExamId(entity.getId()); + finishedMsg.setTargetStatus(ExamStatusEnum.FINISHED); + product.examMsg(entity.getId(), JSON.toJSONString(finishedMsg), DateTimeUtil.getTwoTimeDiffMS(entity.getEndTime(), new Date())); + } + } + + /** + * 鏍规嵁鑰冭瘯鐨勫綋鍓嶇姸鎬侊紝寰楀埌涓嬩竴涓姸鎬� + * + * @param currentStatus + * @return + */ + public ExamStatusEnum getNextStatus(ExamStatusEnum currentStatus) { + if (ExamStatusEnum.NOT_START.equals(currentStatus)) { + return ExamStatusEnum.ING; + } else { + return ExamStatusEnum.FINISHED; + } + } + + /** * 鎵归噺鍒犻櫎 * * @param ids diff --git a/src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java b/src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java index 9fa370e..1a45bac 100644 --- a/src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java +++ b/src/main/java/com/ycl/jxkg/utils/DateTimeUtil.java @@ -153,4 +153,15 @@ } return list; } + + /** + * 璁$畻涓や釜鏃堕棿涔嬮棿鐨勬绉� + * + * @param bigDate 澶ф棩鏈� + * @param smallDate 灏忔棩鏈� + * @return + */ + public static Long getTwoTimeDiffMS(Date bigDate, Date smallDate) { + return bigDate.getTime() - smallDate.getTime(); + } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3820707..3967ffa 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -19,14 +19,14 @@ port: 5672 publisher-confirms: true # 寮�鍚敓浜ц�呭彂閫佺‘璁� publisher-returns: true # 寮�鍚敓浜ц�呭彂閫佸け璐ラ��鍥� - listener: - simple: - default-requeue-rejected: false # 鍏抽棴榛樿鎷掔粷娑堣垂鏃剁殑閲嶆柊鍏ラ槦锛屾垜浠娇鐢ㄦ湰鍦伴噸璇曟秷璐� - # 纭妯″紡锛氭墜鍔紝寮�鍚簡灏卞繀椤诲湪浠g爜涓墜鍔ㄧ‘璁わ紝鍚﹀垯娑堟伅浼氫竴鐩撮噸澶嶆秷璐广�� - # 寮�鍚簡閲嶈瘯灏卞簲璇ヨ缃负鑷姩纭锛屽洜涓烘墜鍔ㄧ‘璁ら渶瑕佹崟鑾峰紓甯革紝鑰岄噸璇曞氨鏄彂鐢熷紓甯告墠浼氶噸璇� - acknowledge-mode: manual - retry: - enabled: true # 娑堣垂鏃跺嚭鐜板紓甯歌繘琛岄噸璇曟秷璐癸紝娉ㄦ剰涓嶈兘琚崟鑾凤紝鍚﹀垯鏃犳硶閲嶈瘯 - max-attempts: 3 # 鏈�澶ч噸璇曟鏁� - initial-interval: 3000 # 鍒濇閲嶈瘯绛夊緟闂撮殧 - multiplier: 2 # 閲嶈瘯澶辫触鍚庯紝涓嬫绛夊緟鏃堕棿澧炲姞澶氬皯鍊嶃�� +# listener: +# simple: +# default-requeue-rejected: false # 鍏抽棴榛樿鎷掔粷娑堣垂鏃剁殑閲嶆柊鍏ラ槦锛屾垜浠娇鐢ㄦ湰鍦伴噸璇曟秷璐� +# # 纭妯″紡锛氭墜鍔紝寮�鍚簡灏卞繀椤诲湪浠g爜涓墜鍔ㄧ‘璁わ紝鍚﹀垯娑堟伅浼氫竴鐩撮噸澶嶆秷璐广�� +# # 寮�鍚簡閲嶈瘯灏卞簲璇ヨ缃负鑷姩纭锛屽洜涓烘墜鍔ㄧ‘璁ら渶瑕佹崟鑾峰紓甯革紝鑰岄噸璇曞氨鏄彂鐢熷紓甯告墠浼氶噸璇� +# acknowledge-mode: auto +# retry: +# enabled: true # 娑堣垂鏃跺嚭鐜板紓甯歌繘琛岄噸璇曟秷璐癸紝娉ㄦ剰涓嶈兘琚崟鑾凤紝鍚﹀垯鏃犳硶閲嶈瘯 +# max-attempts: 3 # 鏈�澶ч噸璇曟鏁� +# initial-interval: 3000 # 鍒濇閲嶈瘯绛夊緟闂撮殧 +# multiplier: 2 # 閲嶈瘯澶辫触鍚庯紝涓嬫绛夊緟鏃堕棿澧炲姞澶氬皯鍊嶃�� -- Gitblit v1.8.0