From 2f68e5600f0b60d6f8d170f4536e1fc410662ea7 Mon Sep 17 00:00:00 2001 From: xiangpei <xiangpei@timesnew.cn> Date: 星期二, 01 七月 2025 11:14:39 +0800 Subject: [PATCH] 视频es处理通过mq异步执行 --- consumer/src/main/java/cn/lili/listener/VideoMessageListener.java | 62 +++++++++++++++++++++++++++++-- 1 files changed, 58 insertions(+), 4 deletions(-) diff --git a/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java b/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java index 613f9af..1bedb96 100644 --- a/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java +++ b/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java @@ -1,9 +1,13 @@ package cn.lili.listener; import cn.lili.cache.Cache; +import cn.lili.elasticsearch.EsSuffix; +import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; import cn.lili.modules.lmk.domain.entity.MyCollect; import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord; +import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm; +import cn.lili.modules.lmk.service.EsService; import cn.lili.modules.lmk.service.ThumbsUpRecordService; import cn.lili.modules.lmk.service.VideoCommentService; import cn.lili.modules.lmk.service.VideoService; @@ -16,6 +20,7 @@ import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; /** @@ -33,20 +38,32 @@ private VideoService videoService; @Autowired + @Qualifier("videoEsServiceImpl") + private EsService esService; + + @Autowired private Cache<Object> cache; @Override public void onMessage(MessageExt messageExt) { try { String msg = new String(messageExt.getBody()); - if (StringUtils.isBlank(msg)) { - log.error("video msg is null, cant not consumer"); - return; - } + switch (VideoTagsEnum.valueOf(messageExt.getTags())) { case COLLECT: this.collect(msg); break; + case ES_RECREATE: + this.recreateVideoIndex(); + break; + case ES_DOC_ADD_OR_UPDATE: + this.addOrUpdateEsVideo(msg); + break; + case ES_DOC_UPDATE_SOME_FIELD: + this.updateEsVideoSomeField(msg); + break; + case ES_DOC_DEL: + this.delEsVideo(msg); default: log.error("video msg not match correct tag, consumer err"); break; @@ -66,4 +83,41 @@ videoService.mqCollectChange(collect); } + /** + * 閲嶅缓瑙嗛绱㈠紩 + * + */ + public void recreateVideoIndex() { + esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json"); + } + + /** + * 鏂板es瑙嗛鏁版嵁/鏇存柊 + * + * @param msg + */ + public void addOrUpdateEsVideo(String msg) { + VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class); + esService.addOrUpdateDocument(videoIndex); + } + + /** + * 鏇存柊es瑙嗛鐨勬煇浜涘瓧娈� + * + * @param msg + */ + public void updateEsVideoSomeField(String msg) { + VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class); + esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields()); + } + + /** + * 鏍规嵁id鍒犻櫎es涓殑瑙嗛 + * + * @param id + */ + public void delEsVideo(String id) { + esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id); + } + } -- Gitblit v1.8.0