From 2f68e5600f0b60d6f8d170f4536e1fc410662ea7 Mon Sep 17 00:00:00 2001 From: xiangpei <xiangpei@timesnew.cn> Date: 星期二, 01 七月 2025 11:14:39 +0800 Subject: [PATCH] 视频es处理通过mq异步执行 --- framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java | 260 +++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 228 insertions(+), 32 deletions(-) diff --git a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java index 0a734f9..3deed2b 100644 --- a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java +++ b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java @@ -1,7 +1,14 @@ package cn.lili.modules.lmk.service.impl; +import cn.lili.cache.Cache; +import cn.lili.cache.CachePrefix; +import cn.lili.common.properties.RocketmqCustomProperties; import cn.lili.common.security.context.UserContext; +import cn.lili.elasticsearch.EsSuffix; +import cn.lili.modules.lmk.constant.RedisKeyExpireConstant; +import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; import cn.lili.modules.lmk.domain.entity.*; +import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.domain.form.*; import cn.lili.modules.lmk.domain.query.*; import cn.lili.modules.lmk.domain.vo.*; @@ -11,6 +18,9 @@ import cn.lili.modules.member.entity.dos.Member; import cn.lili.modules.member.service.FootprintService; import cn.lili.modules.member.service.MemberService; +import cn.lili.rocketmq.RocketmqSendCallbackBuilder; +import cn.lili.rocketmq.tags.CommentTagsEnum; +import cn.lili.rocketmq.tags.VideoTagsEnum; import cn.lili.utils.COSUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -23,6 +33,8 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; import cn.lili.utils.PageUtil; @@ -57,34 +69,11 @@ private final KitchenVideoTypeRefService kitchenVideoTypeRefService; private final VideoGoodsService videoGoodsService; private final KitchenTypeService kitchenTypeService; + private final Cache cache; - /** - * 娣诲姞 - * @param form - * @return - */ - @Override - public Result add(WxVideoForm form) { - Video entity = WxVideoForm.getEntityByForm(form, null); - baseMapper.insert(entity); - return Result.ok("娣诲姞鎴愬姛"); - } + private final RocketmqCustomProperties rocketmqCustomProperties; + private final RocketMQTemplate rocketMQTemplate; - /** - * 淇敼 - * @param form - * @return - */ - @Override - public Result update(WxVideoForm form) { - Video entity = baseMapper.selectById(form.getId()); - - // 涓虹┖鎶汭llegalArgumentException锛屽仛鍏ㄥ眬寮傚父澶勭悊 - Assert.notNull(entity, "璁板綍涓嶅瓨鍦�"); - BeanUtils.copyProperties(form, entity); - baseMapper.updateById(entity); - return Result.ok("淇敼鎴愬姛"); - } /** * 鎵归噺鍒犻櫎 @@ -103,8 +92,18 @@ * @return */ @Override + @Transactional(rollbackFor = Exception.class) public Result removeById(String id) { baseMapper.deleteById(id); + new LambdaUpdateChainWrapper<>(videoGoodsService.getBaseMapper()) + .eq(VideoGoods::getVideoId, id) + .remove(); + new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper()) + .eq(VideoTagRef::getVideoId, id) + .remove(); + // mq寮傛鍒犻櫎es鏁版嵁 + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_DEL.name(); + rocketMQTemplate.asyncSend(destination, id, RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍒犻櫎鎴愬姛"); } @@ -136,6 +135,11 @@ } else if (VideoContentTypeEnum.IMG.getValue().equals(vo.getVideoContentType()) && StringUtils.isNotBlank(vo.getVideoImgs())) { vo.setImgs(JSON.parseArray(vo.getVideoImgs(), String.class).stream().map(fileKey -> cosUtil.getPreviewUrl(fileKey)).collect(Collectors.toList())); } + if (CollectionUtils.isNotEmpty(vo.getGoodsList())) { + vo.getGoodsList().stream().forEach(goods -> { + goods.setThumbnail(cosUtil.getPreviewUrl(goods.getThumbnail())); + }); + } return Result.ok().data(vo); } @@ -161,11 +165,13 @@ video.setStatus(VideoStatusEnum.AUDITING.getValue()); video.setCoverUrl(form.getCover()); video.setVideoType(VideoTypeEnum.VIDEO.getValue()); + video.setRecommend(Boolean.FALSE); if (VideoContentTypeEnum.IMG.getValue().equals(form.getVideoContentType())) { video.setVideoImgs(JSON.toJSONString(form.getVideoImgs())); } baseMapper.insert(video); // 2.澶勭悊鏍囩 + List<SimpleVideoTagVO> esTagList = new ArrayList<>(2); List<VideoTagRef> videoTagRefs = form.getTags().stream().map(tag -> { VideoTagRef videoTagRef = new VideoTagRef(); videoTagRef.setVideoId(video.getId()); @@ -185,24 +191,43 @@ } else { videoTagRef.setVideoTagId(tag.getId()); } + SimpleVideoTagVO esTag = new SimpleVideoTagVO(); + esTag.setVideoId(video.getId()); + esTag.setTagName(tag.getTagName()); + esTag.setId(tag.getId()); + esTagList.add(esTag); return videoTagRef; }).collect(Collectors.toList()); videoTagRefService.saveBatch(videoTagRefs); // 3. 淇濆瓨瑙嗛鏂囦欢淇℃伅 lmkFileService.addByForm(form.getFileInfo()); // 4. 澶勭悊閫夋嫨鐨勫晢鍝� + List<VideoGoods> videoGoods = new ArrayList<>(2); if (CollectionUtils.isNotEmpty(form.getGoodsList())) { - List<VideoGoods> videoGoods = new ArrayList<>(2); for (int i = 0; i < form.getGoodsList().size(); i++) { VideoGoods e = new VideoGoods(); e.setVideoId(video.getId()); e.setGoodsId(form.getGoodsList().get(i).getGoodsId()); + e.setGoodsSkuId(form.getGoodsList().get(i).getGoodsSkuId()); e.setGoodsNum(form.getGoodsList().get(i).getGoodsNum()); e.setOrderNum(i); - videoGoods.add(e); + videoGoodsService.save(e); } videoGoodsService.saveBatch(videoGoods); } + // 5. 鏋勫缓es涓暟鎹紝mq寮傛澶勭悊 + VideoIndex videoIndex = new VideoIndex(); + BeanUtils.copyProperties(video, videoIndex); + videoIndex.setCoverFileKey(video.getCoverUrl()); + List<VideoGoodsDetailVO> esGoodsList = videoGoods.stream().map(goods -> { + VideoGoodsDetailVO vo = new VideoGoodsDetailVO(); + BeanUtils.copyProperties(goods, vo); + return vo; + }).collect(Collectors.toList()); + videoIndex.setGoodsList(esGoodsList); + videoIndex.setTagList(esTagList); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~"); } @@ -226,6 +251,7 @@ new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper()) .eq(VideoTagRef::getVideoId, video.getId()) .remove(); + List<SimpleVideoTagVO> esTagList = new ArrayList<>(2); List<VideoTagRef> videoTagRefs = form.getTags().stream().map(tag -> { VideoTagRef videoTagRef = new VideoTagRef(); videoTagRef.setVideoId(video.getId()); @@ -245,6 +271,11 @@ } else { videoTagRef.setVideoTagId(tag.getId()); } + SimpleVideoTagVO esTag = new SimpleVideoTagVO(); + esTag.setVideoId(video.getId()); + esTag.setTagName(tag.getTagName()); + esTag.setId(tag.getId()); + esTagList.add(esTag); return videoTagRef; }).collect(Collectors.toList()); videoTagRefService.saveBatch(videoTagRefs); @@ -254,18 +285,32 @@ new LambdaUpdateChainWrapper<>(videoGoodsService.getBaseMapper()) .eq(VideoGoods::getVideoId, video.getId()) .remove(); + List<VideoGoods> videoGoods = new ArrayList<>(2); if (CollectionUtils.isNotEmpty(form.getGoodsList())) { - List<VideoGoods> videoGoods = new ArrayList<>(2); for (int i = 0; i < form.getGoodsList().size(); i++) { VideoGoods e = new VideoGoods(); e.setVideoId(video.getId()); e.setGoodsId(form.getGoodsList().get(i).getGoodsId()); + e.setGoodsSkuId(form.getGoodsList().get(i).getGoodsSkuId()); e.setGoodsNum(form.getGoodsList().get(i).getGoodsNum()); e.setOrderNum(i); videoGoods.add(e); } videoGoodsService.saveBatch(videoGoods); } + // 5. 鏇存柊es涓殑鏁版嵁锛宮q寮傛澶勭悊 + VideoIndex videoIndex = new VideoIndex(); + BeanUtils.copyProperties(video, videoIndex); + videoIndex.setCoverFileKey(video.getCoverUrl()); + List<VideoGoodsDetailVO> esGoodsList = videoGoods.stream().map(goods -> { + VideoGoodsDetailVO vo = new VideoGoodsDetailVO(); + BeanUtils.copyProperties(goods, vo); + return vo; + }).collect(Collectors.toList()); + videoIndex.setGoodsList(esGoodsList); + videoIndex.setTagList(esTagList); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("鍙戝竷鎴愬姛锛岃棰戝鏍镐腑~"); } @@ -294,6 +339,15 @@ .eq(Video::getId, form.getId()) .set(Video::getRecommend, form.getRecommend()) .update(); + + // mq寮傛鏇存柊es + Map<String, Object> fields = new HashMap<>(2); + fields.put("recommend", form.getRecommend()); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(form.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("璁剧疆鎴愬姛"); } @@ -313,34 +367,64 @@ } videoAuditRecordService.save(auditRecord); // 2. 淇敼瑙嗛鐘舵�� + Map<String, Object> fields = new HashMap<>(2); if (form.getResult()) { video.setStatus(VideoStatusEnum.PUBLISHED.getValue()); video.setAuditPassTime(new Date()); + + fields.put("status", VideoStatusEnum.PUBLISHED.getValue()); } else { video.setStatus(VideoStatusEnum.REJECT.getValue()); + fields.put("status", VideoStatusEnum.REJECT.getValue()); } baseMapper.updateById(video); + + // 3. mq寮傛鏇存柊es + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(video.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok(); } @Override public Result up(String id) { + // 1. 鏇存柊鏁版嵁搴� new LambdaUpdateChainWrapper<>(baseMapper) .eq(Video::getId, id) .set(Video::getStatus, VideoStatusEnum.PUBLISHED.getValue()) .update(); + // 2. mq寮傛鏇存柊es + Map<String, Object> fields = new HashMap<>(2); + fields.put("status", VideoStatusEnum.PUBLISHED.getValue()); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(id); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("涓婃灦鎴愬姛"); } @Override public Result down(VideoDownForm form) { + // 1. 鏇存柊鏁版嵁搴� new LambdaUpdateChainWrapper<>(baseMapper) .eq(Video::getId, form.getId()) .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue()) .update(); + // 2. mq寮傛鏇存柊es + Map<String, Object> fields = new HashMap<>(2); + fields.put("status", VideoStatusEnum.DISABLE.getValue()); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(form.getId()); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); // TODO 灏嗕笅鏋跺師鍥犱互閫氱煡鐨勬柟寮忓憡鐭ョ敤鎴� + return Result.ok("涓嬫灦鎴愬姛"); } @@ -350,6 +434,14 @@ .eq(Video::getId, id) .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue()) .update(); + // 2. mq寮傛鏇存柊es + Map<String, Object> fields = new HashMap<>(2); + fields.put("status", VideoStatusEnum.DISABLE.getValue()); + VideoEsUpdateDTO dto = new VideoEsUpdateDTO(); + dto.setId(id); + dto.setFields(fields); + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name(); + rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback()); return Result.ok("涓嬫灦鎴愬姛"); } @@ -387,17 +479,57 @@ page.getRecords().forEach(v -> { v.setTagList(tagMap.get(v.getId())); v.setCollected(CollectionUtils.isNotEmpty(collectMap.get(v.getId()))); + v.setCommentNum(this.getCommentNum(v.getId(), v.getCommentNum())); + v.setCollectNum(this.getCollectNum(v.getId(), v.getCollectNum())); if (VideoContentTypeEnum.VIDEO.getValue().equals(v.getVideoContentType())) { v.setVideoUrl(cosUtil.getPreviewUrl(v.getVideoFileKey())); v.setCoverUrl(cosUtil.getPreviewUrl(v.getCoverFileKey())); } else if (VideoContentTypeEnum.IMG.getValue().equals(v.getVideoContentType()) && StringUtils.isNotBlank(v.getVideoImgs())) { v.setImgs(JSON.parseArray(v.getVideoImgs(), String.class).stream().map(fileKey -> cosUtil.getPreviewUrl(fileKey)).collect(Collectors.toList())); } - + if (CollectionUtils.isNotEmpty(v.getGoodsList())) { + v.getGoodsList().stream().forEach(goods -> { + goods.setThumbnail(cosUtil.getPreviewUrl(goods.getThumbnail())); + }); + } v.setSubscribeThisAuthor(subscribes.contains(v.getAuthorId())); }); } return Result.ok().data(page.getRecords()); + } + + /** + * 浠巖edis涓幏鍙栬瘎璁烘暟閲忥紝濡傛灉redis涓病鏈夊垯灏唌ysql涓殑鏁伴噺鍐欏叆鍒皉edis + * + * @param videoId + * @param mysqlNum + * @return + */ + private Integer getCommentNum(String videoId, Integer mysqlNum) { + Object redisNum = cache.get(CachePrefix.VIDEO_COMMENT_NUM.getPrefixWithId(videoId)); + if (Objects.isNull(redisNum)) { + // redis涓病鏈夊氨鎶婃暟鎹簱鐨勫啓鍒皉edis涓� + cache.put(CachePrefix.VIDEO_COMMENT_NUM.getPrefixWithId(videoId), mysqlNum, RedisKeyExpireConstant.COMMENT_NUM_EXPIRE, RedisKeyExpireConstant.EXPIRE_DAY); + return mysqlNum; + } + return (Integer) redisNum; + } + + /** + * 浠巖edis涓幏鍙栨敹钘忔暟閲忥紝濡傛灉redis涓病鏈夊垯灏唌ysql涓殑鏁伴噺鍐欏叆鍒皉edis + * + * @param videoId + * @param mysqlNum + * @return + */ + private Integer getCollectNum(String videoId, Integer mysqlNum) { + Object redisNum = cache.get(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(videoId)); + if (Objects.isNull(redisNum)) { + // redis涓病鏈夊氨鎶婃暟鎹簱鐨勫啓鍒皉edis涓� + cache.put(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(videoId), mysqlNum, RedisKeyExpireConstant.COLLECT_NUM_EXPIRE, RedisKeyExpireConstant.EXPIRE_DAY); + return mysqlNum; + } + return (Integer) redisNum; } @Override @@ -444,6 +576,10 @@ List<List<CollectTypeNumVO>> chunks = ListUtils.partition(numList, 500); for (List<CollectTypeNumVO> chunk : chunks) { baseMapper.updateCollectNumBatch(chunk); + new LambdaUpdateChainWrapper<>(baseMapper) + .in(Video::getId, chunk.stream().map(CollectTypeNumVO::getId).collect(Collectors.toList())) + .set(Video::getCollectNumJob, Boolean.FALSE) + .update(); } } @@ -454,6 +590,10 @@ List<List<CollectTypeNumVO>> chunks = ListUtils.partition(numList, 500); for (List<CollectTypeNumVO> chunk : chunks) { baseMapper.updateCommentNumBatch(chunk); + new LambdaUpdateChainWrapper<>(baseMapper) + .in(Video::getId, chunk.stream().map(CollectTypeNumVO::getId).collect(Collectors.toList())) + .set(Video::getCommentNumJob, Boolean.FALSE) + .update(); } } @@ -521,6 +661,9 @@ @Override public Result getGoodsDetail(String videoId) { List<VideoGoodsDetailVO> goodsList = baseMapper.getVideoGoods(videoId); + goodsList.stream().forEach(goods -> { + goods.setThumbnail(cosUtil.getPreviewUrl(goods.getThumbnail())); + }); return Result.ok().data(goodsList); } @@ -571,7 +714,7 @@ video.setAuthorId(UserContext.getCurrentUserId()); video.setVideoType(VideoTypeEnum.HEALTH.getValue()); //璁剧疆濉厖妯″紡 淇濇寔姣斾緥锛屽畬鏁存樉绀� - video.setVideoFit("contain"); + video.setVideoFit(form.getVideoFit()); video.setVideoContentType(VideoContentTypeEnum.VIDEO.getValue()); video.setStatus(VideoStatusEnum.PUBLISHED.getValue()); baseMapper.insert(video); @@ -628,7 +771,7 @@ video.setAuthorId(UserContext.getCurrentUserId()); video.setVideoType(VideoTypeEnum.COOK.getValue()); //璁剧疆濉厖妯″紡 淇濇寔姣斾緥锛屽畬鏁存樉绀� - video.setVideoFit("contain"); + video.setVideoFit(form.getVideoFit()); video.setVideoContentType(VideoContentTypeEnum.VIDEO.getValue()); video.setStatus(VideoStatusEnum.PUBLISHED.getValue()); baseMapper.insert(video); @@ -707,4 +850,57 @@ .eq(KitchenVideoTypeRef::getVideoId, id)); return Result.ok("鍒犻櫎鎴愬姛"); } + + /** + * mq鎵ц瑙嗛鐨勬敹钘�/鍙栨秷鏀惰棌 + * + * @param collect + */ + @Override + @Transactional(rollbackFor = Exception.class) + public void mqCollectChange(MyCollect collect) { + MyCollect myCollect = new LambdaQueryChainWrapper<>(myCollectService.getBaseMapper()) + .eq(MyCollect::getCollectType, collect.getCollectType()) + .eq(MyCollect::getRefId, collect.getRefId()) + .eq(MyCollect::getUserId, collect.getUserId()) + .one(); + boolean add = false; + if (Objects.nonNull(myCollect)) { + myCollectService.removeById(myCollect.getId()); + } else { + myCollectService.save(collect); + add = true; + } + // 澶勭悊缂撳瓨 + Video video = baseMapper.selectById(collect.getRefId()); + if (cache.exist(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(collect.getRefId()))) { + if (add) { + cache.incr(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(collect.getRefId())); + } else { + cache.decr(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(collect.getRefId())); + } + } else { + if (Objects.nonNull(video)) { + cache.put(CachePrefix.VIDEO_COLLECT_NUM.getPrefixWithId(video.getId()), + video.getCollectNum() + (add ? 1 : -1), + RedisKeyExpireConstant.COLLECT_NUM_EXPIRE, + RedisKeyExpireConstant.EXPIRE_DAY); + } + } + // 鏍囪瘑璇ヨ棰戦渶瑕侀�氳繃瀹氭椂浠诲姟缁熻鏀惰棌鏁� + if (Objects.nonNull(video) && ! video.getCollectNumJob()) { + new LambdaUpdateChainWrapper<>(baseMapper) + .eq(Video::getId, video.getId()) + .set(Video::getCollectNumJob, Boolean.TRUE) + .update(); + } + } + + @Override + public Result recreateEsIndex() { + String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_RECREATE.name(); + // 娑堟伅浣撲笉鑳戒负绌猴紝闅忎究浼犱竴涓�1 + rocketMQTemplate.asyncSend(destination, "1", RocketmqSendCallbackBuilder.commonCallback()); + return Result.ok("宸叉垚鍔熷彂璧锋瀯寤鸿姹傦紝绋嶄綔绛夊緟鍚庝究浼氳嚜鍔ㄥ畬鎴�"); + } } -- Gitblit v1.8.0