From 2f68e5600f0b60d6f8d170f4536e1fc410662ea7 Mon Sep 17 00:00:00 2001 From: xiangpei <xiangpei@timesnew.cn> Date: 星期二, 01 七月 2025 11:14:39 +0800 Subject: [PATCH] 视频es处理通过mq异步执行 --- framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java | 104 +++++++++++++++++++++++++++++++--------------------- 1 files changed, 62 insertions(+), 42 deletions(-) diff --git a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java index 980f329..3deed2b 100644 --- a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java @@ -2,9 +2,11 @@ import cn.lili.cache.Cache; import cn.lili.cache.CachePrefix; +import cn.lili.common.properties.RocketmqCustomProperties; import cn.lili.common.security.context.UserContext; import cn.lili.elasticsearch.EsSuffix; import cn.lili.modules.lmk.constant.RedisKeyExpireConstant; +import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; import cn.lili.modules.lmk.domain.entity.*; import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.domain.form.*; @@ -16,6 +18,9 @@ import cn.lili.modules.member.entity.dos.Member; import cn.lili.modules.member.service.FootprintService; import cn.lili.modules.member.service.MemberService; +import cn.lili.rocketmq.RocketmqSendCallbackBuilder; +import cn.lili.rocketmq.tags.CommentTagsEnum; +import cn.lili.rocketmq.tags.VideoTagsEnum; import cn.lili.utils.COSUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -28,6 +33,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; @@ -65,37 +71,9 @@ private final KitchenTypeService kitchenTypeService; private final Cache cache; - @Qualifier("videoEsServiceImpl") - private final EsService videoEsService; + private final RocketmqCustomProperties rocketmqCustomProperties; + private final RocketMQTemplate rocketMQTemplate; - - /** - * 娣诲姞 - * @param form - * @return - */ - @Override - public Result add(WxVideoForm form) { - Video entity = WxVideoForm.getEntityByForm(form, null); - baseMapper.insert(entity); - return Result.ok("娣诲姞鎴愬姛"); - } - - /** - * 淇敼 - * @param form - * @return - */ - @Override - public Result update(WxVideoForm form) { - Video entity = baseMapper.selectById(form.getId()); - - // 涓虹┖鎶汭llegalArgumentException锛屽仛鍏ㄥ眬寮傚父澶勭悊 - Assert.notNull(entity, "璁板綍涓嶅瓨鍦�"); - BeanUtils.copyProperties(form, entity); - baseMapper.updateById(entity); - return Result.ok("淇敼鎴愬姛"); - } /** * 鎵归噺鍒犻櫎 @@ -123,7 +101,9 @@ new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper()) .eq(VideoTagRef::getVideoId, id) .remove(); - videoEsService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id); + // mq寮傛鍒犻櫎es鏁版嵁 + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_DEL.name(); + rocketMQTemplate.asyncSend(destination, id, RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍒犻櫎鎴愬姛"); } @@ -185,6 +165,7 @@ video.setStatus(VideoStatusEnum.AUDITING.getValue()); video.setCoverUrl(form.getCover()); video.setVideoType(VideoTypeEnum.VIDEO.getValue()); + video.setRecommend(Boolean.FALSE); if (VideoContentTypeEnum.IMG.getValue().equals(form.getVideoContentType())) { video.setVideoImgs(JSON.toJSONString(form.getVideoImgs())); } @@ -234,7 +215,7 @@ } videoGoodsService.saveBatch(videoGoods); } - // 5. 鏋勫缓es涓暟鎹� + // 5. 鏋勫缓es涓暟鎹紝mq寮傛澶勭悊 VideoIndex videoIndex = new VideoIndex(); BeanUtils.copyProperties(video, videoIndex); videoIndex.setCoverFileKey(video.getCoverUrl()); @@ -245,7 +226,8 @@ }).collect(Collectors.toList()); videoIndex.setGoodsList(esGoodsList); videoIndex.setTagList(esTagList); - videoEsService.addOrUpdateDocument(EsSuffix.VIDEO_INDEX_NAME, video.getId(), videoIndex); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~"); } @@ -316,7 +298,7 @@ } videoGoodsService.saveBatch(videoGoods); } - // 5. 鏇存柊es涓殑鏁版嵁 + // 5. 鏇存柊es涓殑鏁版嵁锛宮q寮傛澶勭悊 VideoIndex videoIndex = new VideoIndex(); BeanUtils.copyProperties(video, videoIndex); videoIndex.setCoverFileKey(video.getCoverUrl()); @@ -327,7 +309,8 @@ }).collect(Collectors.toList()); videoIndex.setGoodsList(esGoodsList); videoIndex.setTagList(esTagList); - videoEsService.addOrUpdateDocument(EsSuffix.VIDEO_INDEX_NAME, video.getId(), videoIndex); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~"); } @@ -356,9 +339,15 @@ .eq(Video::getId, form.getId()) .set(Video::getRecommend, form.getRecommend()) .update(); + + // mq寮傛鏇存柊es Map<String, Object> fields = new HashMap<>(2); fields.put("recommend", form.getRecommend()); - videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, form.getId(), fields); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(form.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("璁剧疆鎴愬姛"); } @@ -378,17 +367,24 @@ } videoAuditRecordService.save(auditRecord); // 2. 淇敼瑙嗛鐘舵�� + Map<String, Object> fields = new HashMap<>(2); if (form.getResult()) { video.setStatus(VideoStatusEnum.PUBLISHED.getValue()); video.setAuditPassTime(new Date()); - Map<String, Object> fields = new HashMap<>(2); fields.put("status", VideoStatusEnum.PUBLISHED.getValue()); - videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, video.getId(), fields); } else { video.setStatus(VideoStatusEnum.REJECT.getValue()); + fields.put("status", VideoStatusEnum.REJECT.getValue()); } baseMapper.updateById(video); + + // 3. mq寮傛鏇存柊es + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(video.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok(); } @@ -400,10 +396,14 @@ .eq(Video::getId, id) .set(Video::getStatus, VideoStatusEnum.PUBLISHED.getValue()) .update(); - // 2. 鏇存柊es + // 2. mq寮傛鏇存柊es Map<String, Object> fields = new HashMap<>(2); fields.put("status", VideoStatusEnum.PUBLISHED.getValue()); - videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, id, fields); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(id); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("涓婃灦鎴愬姛"); } @@ -415,10 +415,14 @@ .eq(Video::getId, form.getId()) .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue()) .update(); - // 2. 鏇存柊es + // 2. mq寮傛鏇存柊es Map<String, Object> fields = new HashMap<>(2); fields.put("status", VideoStatusEnum.DISABLE.getValue()); - videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, form.getId(), fields); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(form.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); // TODO 灏嗕笅鏋跺師鍥犱互閫氱煡鐨勬柟寮忓憡鐭ョ敤鎴� return Result.ok("涓嬫灦鎴愬姛"); @@ -430,6 +434,14 @@ .eq(Video::getId, id) .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue()) .update(); + // 2. mq寮傛鏇存柊es + Map<String, Object> fields = new HashMap<>(2); + fields.put("status", VideoStatusEnum.DISABLE.getValue()); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(id); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("涓嬫灦鎴愬姛"); } @@ -883,4 +895,12 @@ .update(); } } + + @Override + public Result recreateEsIndex() { + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_RECREATE.name(); + // 娑堟伅浣撲笉鑳戒负绌猴紝闅忎究浼犱竴涓�1 + rocketMQTemplate.asyncSend(destination, "1", RocketmqSendCallbackBuilder.commonCallback()); + return Result.ok("宸叉垚鍔熷彂璧锋瀯寤鸿姹傦紝绋嶄綔绛夊緟鍚庝究浼氳嚜鍔ㄥ畬鎴�"); + } } -- Gitblit v1.8.0