From 2f68e5600f0b60d6f8d170f4536e1fc410662ea7 Mon Sep 17 00:00:00 2001
From: xiangpei <xiangpei@timesnew.cn>
Date: 星期二, 01 七月 2025 11:14:39 +0800
Subject: [PATCH] 视频es处理通过mq异步执行

---
 framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java |  151 ++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 120 insertions(+), 31 deletions(-)

diff --git a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java
index 5f7b7ad..3deed2b 100644
--- a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java
+++ b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java
@@ -2,9 +2,13 @@
 
 import cn.lili.cache.Cache;
 import cn.lili.cache.CachePrefix;
+import cn.lili.common.properties.RocketmqCustomProperties;
 import cn.lili.common.security.context.UserContext;
+import cn.lili.elasticsearch.EsSuffix;
 import cn.lili.modules.lmk.constant.RedisKeyExpireConstant;
+import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO;
 import cn.lili.modules.lmk.domain.entity.*;
+import cn.lili.modules.lmk.domain.es.VideoIndex;
 import cn.lili.modules.lmk.domain.form.*;
 import cn.lili.modules.lmk.domain.query.*;
 import cn.lili.modules.lmk.domain.vo.*;
@@ -14,6 +18,9 @@
 import cn.lili.modules.member.entity.dos.Member;
 import cn.lili.modules.member.service.FootprintService;
 import cn.lili.modules.member.service.MemberService;
+import cn.lili.rocketmq.RocketmqSendCallbackBuilder;
+import cn.lili.rocketmq.tags.CommentTagsEnum;
+import cn.lili.rocketmq.tags.VideoTagsEnum;
 import cn.lili.utils.COSUtil;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -26,6 +33,8 @@
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import lombok.RequiredArgsConstructor;
 import cn.lili.utils.PageUtil;
@@ -62,33 +71,9 @@
     private final KitchenTypeService kitchenTypeService;
     private final Cache cache;
 
-    /**
-     * 娣诲姞
-     * @param form
-     * @return
-     */
-    @Override
-    public Result add(WxVideoForm form) {
-        Video entity = WxVideoForm.getEntityByForm(form, null);
-        baseMapper.insert(entity);
-        return Result.ok("娣诲姞鎴愬姛");
-    }
+    private final RocketmqCustomProperties rocketmqCustomProperties;
+    private final RocketMQTemplate rocketMQTemplate;
 
-    /**
-     * 淇敼
-     * @param form
-     * @return
-     */
-    @Override
-    public Result update(WxVideoForm form) {
-        Video entity = baseMapper.selectById(form.getId());
-
-        // 涓虹┖鎶汭llegalArgumentException锛屽仛鍏ㄥ眬寮傚父澶勭悊
-        Assert.notNull(entity, "璁板綍涓嶅瓨鍦�");
-        BeanUtils.copyProperties(form, entity);
-        baseMapper.updateById(entity);
-        return Result.ok("淇敼鎴愬姛");
-    }
 
     /**
      * 鎵归噺鍒犻櫎
@@ -107,8 +92,18 @@
      * @return
      */
     @Override
+    @Transactional(rollbackFor = Exception.class)
     public Result removeById(String id) {
         baseMapper.deleteById(id);
+        new LambdaUpdateChainWrapper<>(videoGoodsService.getBaseMapper())
+                .eq(VideoGoods::getVideoId, id)
+                .remove();
+        new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper())
+                .eq(VideoTagRef::getVideoId, id)
+                .remove();
+        // mq寮傛鍒犻櫎es鏁版嵁
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_DEL.name();
+        rocketMQTemplate.asyncSend(destination, id, RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("鍒犻櫎鎴愬姛");
     }
 
@@ -170,11 +165,13 @@
         video.setStatus(VideoStatusEnum.AUDITING.getValue());
         video.setCoverUrl(form.getCover());
         video.setVideoType(VideoTypeEnum.VIDEO.getValue());
+        video.setRecommend(Boolean.FALSE);
         if (VideoContentTypeEnum.IMG.getValue().equals(form.getVideoContentType())) {
             video.setVideoImgs(JSON.toJSONString(form.getVideoImgs()));
         }
         baseMapper.insert(video);
         // 2.澶勭悊鏍囩
+        List<SimpleVideoTagVO> esTagList = new ArrayList<>(2);
         List<VideoTagRef> videoTagRefs = form.getTags().stream().map(tag -> {
             VideoTagRef videoTagRef = new VideoTagRef();
             videoTagRef.setVideoId(video.getId());
@@ -194,14 +191,19 @@
             } else {
                 videoTagRef.setVideoTagId(tag.getId());
             }
+            SimpleVideoTagVO esTag = new SimpleVideoTagVO();
+            esTag.setVideoId(video.getId());
+            esTag.setTagName(tag.getTagName());
+            esTag.setId(tag.getId());
+            esTagList.add(esTag);
             return videoTagRef;
         }).collect(Collectors.toList());
         videoTagRefService.saveBatch(videoTagRefs);
         // 3. 淇濆瓨瑙嗛鏂囦欢淇℃伅
         lmkFileService.addByForm(form.getFileInfo());
         // 4. 澶勭悊閫夋嫨鐨勫晢鍝�
+        List<VideoGoods> videoGoods = new ArrayList<>(2);
         if (CollectionUtils.isNotEmpty(form.getGoodsList())) {
-            List<VideoGoods> videoGoods = new ArrayList<>(2);
             for (int i = 0; i < form.getGoodsList().size(); i++) {
                 VideoGoods e = new VideoGoods();
                 e.setVideoId(video.getId());
@@ -209,10 +211,23 @@
                 e.setGoodsSkuId(form.getGoodsList().get(i).getGoodsSkuId());
                 e.setGoodsNum(form.getGoodsList().get(i).getGoodsNum());
                 e.setOrderNum(i);
-                videoGoods.add(e);
+                videoGoodsService.save(e);
             }
             videoGoodsService.saveBatch(videoGoods);
         }
+        // 5. 鏋勫缓es涓暟鎹紝mq寮傛澶勭悊
+        VideoIndex videoIndex = new VideoIndex();
+        BeanUtils.copyProperties(video, videoIndex);
+        videoIndex.setCoverFileKey(video.getCoverUrl());
+        List<VideoGoodsDetailVO> esGoodsList = videoGoods.stream().map(goods -> {
+            VideoGoodsDetailVO vo = new VideoGoodsDetailVO();
+            BeanUtils.copyProperties(goods, vo);
+            return vo;
+        }).collect(Collectors.toList());
+        videoIndex.setGoodsList(esGoodsList);
+        videoIndex.setTagList(esTagList);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~");
     }
 
@@ -236,6 +251,7 @@
         new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper())
                 .eq(VideoTagRef::getVideoId, video.getId())
                 .remove();
+        List<SimpleVideoTagVO> esTagList = new ArrayList<>(2);
         List<VideoTagRef> videoTagRefs = form.getTags().stream().map(tag -> {
             VideoTagRef videoTagRef = new VideoTagRef();
             videoTagRef.setVideoId(video.getId());
@@ -255,6 +271,11 @@
             } else {
                 videoTagRef.setVideoTagId(tag.getId());
             }
+            SimpleVideoTagVO esTag = new SimpleVideoTagVO();
+            esTag.setVideoId(video.getId());
+            esTag.setTagName(tag.getTagName());
+            esTag.setId(tag.getId());
+            esTagList.add(esTag);
             return videoTagRef;
         }).collect(Collectors.toList());
         videoTagRefService.saveBatch(videoTagRefs);
@@ -264,8 +285,8 @@
         new LambdaUpdateChainWrapper<>(videoGoodsService.getBaseMapper())
                 .eq(VideoGoods::getVideoId, video.getId())
                 .remove();
+        List<VideoGoods> videoGoods = new ArrayList<>(2);
         if (CollectionUtils.isNotEmpty(form.getGoodsList())) {
-            List<VideoGoods> videoGoods = new ArrayList<>(2);
             for (int i = 0; i < form.getGoodsList().size(); i++) {
                 VideoGoods e = new VideoGoods();
                 e.setVideoId(video.getId());
@@ -277,6 +298,19 @@
             }
             videoGoodsService.saveBatch(videoGoods);
         }
+        // 5. 鏇存柊es涓殑鏁版嵁锛宮q寮傛澶勭悊
+        VideoIndex videoIndex = new VideoIndex();
+        BeanUtils.copyProperties(video, videoIndex);
+        videoIndex.setCoverFileKey(video.getCoverUrl());
+        List<VideoGoodsDetailVO> esGoodsList = videoGoods.stream().map(goods -> {
+            VideoGoodsDetailVO vo = new VideoGoodsDetailVO();
+            BeanUtils.copyProperties(goods, vo);
+            return vo;
+        }).collect(Collectors.toList());
+        videoIndex.setGoodsList(esGoodsList);
+        videoIndex.setTagList(esTagList);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~");
     }
 
@@ -305,6 +339,15 @@
                 .eq(Video::getId, form.getId())
                 .set(Video::getRecommend, form.getRecommend())
                 .update();
+
+        // mq寮傛鏇存柊es
+        Map<String, Object> fields = new HashMap<>(2);
+        fields.put("recommend", form.getRecommend());
+        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
+        dto.setId(form.getId());
+        dto.setFields(fields);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("璁剧疆鎴愬姛");
     }
 
@@ -324,34 +367,64 @@
         }
         videoAuditRecordService.save(auditRecord);
         // 2. 淇敼瑙嗛鐘舵��
+        Map<String, Object> fields = new HashMap<>(2);
         if (form.getResult()) {
             video.setStatus(VideoStatusEnum.PUBLISHED.getValue());
             video.setAuditPassTime(new Date());
+
+            fields.put("status", VideoStatusEnum.PUBLISHED.getValue());
         } else {
             video.setStatus(VideoStatusEnum.REJECT.getValue());
+            fields.put("status", VideoStatusEnum.REJECT.getValue());
         }
         baseMapper.updateById(video);
+
+        // 3. mq寮傛鏇存柊es
+        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
+        dto.setId(video.getId());
+        dto.setFields(fields);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok();
     }
 
 
     @Override
     public Result up(String id) {
+        // 1. 鏇存柊鏁版嵁搴�
         new LambdaUpdateChainWrapper<>(baseMapper)
                 .eq(Video::getId, id)
                 .set(Video::getStatus, VideoStatusEnum.PUBLISHED.getValue())
                 .update();
+        // 2. mq寮傛鏇存柊es
+        Map<String, Object> fields = new HashMap<>(2);
+        fields.put("status", VideoStatusEnum.PUBLISHED.getValue());
+        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
+        dto.setId(id);
+        dto.setFields(fields);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("涓婃灦鎴愬姛");
     }
 
 
     @Override
     public Result down(VideoDownForm form) {
+        // 1. 鏇存柊鏁版嵁搴�
         new LambdaUpdateChainWrapper<>(baseMapper)
                 .eq(Video::getId, form.getId())
                 .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue())
                 .update();
+        // 2. mq寮傛鏇存柊es
+        Map<String, Object> fields = new HashMap<>(2);
+        fields.put("status", VideoStatusEnum.DISABLE.getValue());
+        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
+        dto.setId(form.getId());
+        dto.setFields(fields);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
         // TODO 灏嗕笅鏋跺師鍥犱互閫氱煡鐨勬柟寮忓憡鐭ョ敤鎴�
+
         return Result.ok("涓嬫灦鎴愬姛");
     }
 
@@ -361,6 +434,14 @@
                 .eq(Video::getId, id)
                 .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue())
                 .update();
+        // 2. mq寮傛鏇存柊es
+        Map<String, Object> fields = new HashMap<>(2);
+        fields.put("status", VideoStatusEnum.DISABLE.getValue());
+        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
+        dto.setId(id);
+        dto.setFields(fields);
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
+        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
         return Result.ok("涓嬫灦鎴愬姛");
     }
 
@@ -633,7 +714,7 @@
         video.setAuthorId(UserContext.getCurrentUserId());
         video.setVideoType(VideoTypeEnum.HEALTH.getValue());
         //璁剧疆濉厖妯″紡 淇濇寔姣斾緥锛屽畬鏁存樉绀�
-        video.setVideoFit("contain");
+        video.setVideoFit(form.getVideoFit());
         video.setVideoContentType(VideoContentTypeEnum.VIDEO.getValue());
         video.setStatus(VideoStatusEnum.PUBLISHED.getValue());
         baseMapper.insert(video);
@@ -690,7 +771,7 @@
         video.setAuthorId(UserContext.getCurrentUserId());
         video.setVideoType(VideoTypeEnum.COOK.getValue());
         //璁剧疆濉厖妯″紡 淇濇寔姣斾緥锛屽畬鏁存樉绀�
-        video.setVideoFit("contain");
+        video.setVideoFit(form.getVideoFit());
         video.setVideoContentType(VideoContentTypeEnum.VIDEO.getValue());
         video.setStatus(VideoStatusEnum.PUBLISHED.getValue());
         baseMapper.insert(video);
@@ -814,4 +895,12 @@
                     .update();
         }
     }
+
+    @Override
+    public Result recreateEsIndex() {
+        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_RECREATE.name();
+        // 娑堟伅浣撲笉鑳戒负绌猴紝闅忎究浼犱竴涓�1
+        rocketMQTemplate.asyncSend(destination, "1", RocketmqSendCallbackBuilder.commonCallback());
+        return Result.ok("宸叉垚鍔熷彂璧锋瀯寤鸿姹傦紝绋嶄綔绛夊緟鍚庝究浼氳嚜鍔ㄥ畬鎴�");
+    }
 }

--
Gitblit v1.8.0