From aac2321d1cf5536f7ea03f30d55a4aba30fbf710 Mon Sep 17 00:00:00 2001 From: peng <peng.com> Date: 星期三, 02 七月 2025 10:39:39 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/dev' into dev --- consumer/src/main/java/cn/lili/listener/VideoMessageListener.java | 70 +++++++++++++++++++++++++++++++++-- 1 files changed, 66 insertions(+), 4 deletions(-) diff --git a/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java b/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java index 613f9af..e970292 100644 --- a/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java +++ b/consumer/src/main/java/cn/lili/listener/VideoMessageListener.java @@ -1,9 +1,13 @@ package cn.lili.listener; import cn.lili.cache.Cache; +import cn.lili.elasticsearch.EsSuffix; +import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; import cn.lili.modules.lmk.domain.entity.MyCollect; import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord; +import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm; +import cn.lili.modules.lmk.service.EsService; import cn.lili.modules.lmk.service.ThumbsUpRecordService; import cn.lili.modules.lmk.service.VideoCommentService; import cn.lili.modules.lmk.service.VideoService; @@ -16,6 +20,7 @@ import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; /** @@ -33,20 +38,35 @@ private VideoService videoService; @Autowired + @Qualifier("videoEsServiceImpl") + private EsService esService; + + @Autowired private Cache<Object> cache; @Override public void onMessage(MessageExt messageExt) { try { String msg = new String(messageExt.getBody()); - if (StringUtils.isBlank(msg)) { - log.error("video msg is null, cant not consumer"); - return; - } + switch (VideoTagsEnum.valueOf(messageExt.getTags())) { case COLLECT: this.collect(msg); break; + case THUMBS_UP: + this.changeThumbsUp(msg); + break; + case ES_RECREATE: + this.recreateVideoIndex(); + break; + case ES_DOC_ADD_OR_UPDATE: + this.addOrUpdateEsVideo(msg); + break; + case ES_DOC_UPDATE_SOME_FIELD: + this.updateEsVideoSomeField(msg); + break; + case ES_DOC_DEL: + this.delEsVideo(msg); default: log.error("video msg not match correct tag, consumer err"); break; @@ -66,4 +86,46 @@ videoService.mqCollectChange(collect); } + /** + * 閲嶅缓瑙嗛绱㈠紩 + * + */ + public void recreateVideoIndex() { + esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json"); + } + + /** + * 鏂板es瑙嗛鏁版嵁/鏇存柊 + * + * @param msg + */ + public void addOrUpdateEsVideo(String msg) { + VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class); + esService.addOrUpdateDocument(videoIndex); + } + + /** + * 鏇存柊es瑙嗛鐨勬煇浜涘瓧娈� + * + * @param msg + */ + public void updateEsVideoSomeField(String msg) { + VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class); + esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields()); + } + + /** + * 鏍规嵁id鍒犻櫎es涓殑瑙嗛 + * + * @param id + */ + public void delEsVideo(String id) { + esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id); + } + + public void changeThumbsUp(String msg) { + ThumbsUpRecord thumbsUpRecord = JSON.parseObject(msg, ThumbsUpRecord.class); + videoService.mqChangeThumbsUp(thumbsUpRecord); + } + } -- Gitblit v1.8.0