xiangpei
3 天以前 2f68e5600f0b60d6f8d170f4536e1fc410662ea7
视频es处理通过mq异步执行
10个文件已修改
1个文件已添加
265 ■■■■■ 已修改文件
buyer-api/src/main/java/cn/lili/controller/lmk/VideoController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
consumer/src/main/java/cn/lili/listener/VideoMessageListener.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/domain/dto/VideoEsUpdateDTO.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/mapper/VideoMapper.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/service/EsService.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/service/VideoService.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/java/cn/lili/rocketmq/tags/VideoTagsEnum.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
framework/src/main/resources/mapper/lmk/VideoMapper.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
manager-api/src/main/java/cn/lili/controller/lmk/VideoController.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
buyer-api/src/main/java/cn/lili/controller/lmk/VideoController.java
@@ -38,7 +38,7 @@
    }
    @PutMapping
    @ApiOperation(value = "修改", notes = "修改")
    @ApiOperation(value = "修改视频", notes = "修改视频")
    public Result update(@RequestBody @Validated(Update.class) WxVideoForm form) {
        return videoService.updatePublish(form);
    }
consumer/src/main/java/cn/lili/listener/VideoMessageListener.java
@@ -1,9 +1,13 @@
package cn.lili.listener;
import cn.lili.cache.Cache;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO;
import cn.lili.modules.lmk.domain.entity.MyCollect;
import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm;
import cn.lili.modules.lmk.service.EsService;
import cn.lili.modules.lmk.service.ThumbsUpRecordService;
import cn.lili.modules.lmk.service.VideoCommentService;
import cn.lili.modules.lmk.service.VideoService;
@@ -16,6 +20,7 @@
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
@@ -33,20 +38,32 @@
    private VideoService videoService;
    @Autowired
    @Qualifier("videoEsServiceImpl")
    private EsService esService;
    @Autowired
    private Cache<Object> cache;
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            String msg = new String(messageExt.getBody());
            if (StringUtils.isBlank(msg)) {
                log.error("video msg is null, cant not consumer");
                return;
            }
            switch (VideoTagsEnum.valueOf(messageExt.getTags())) {
                case COLLECT:
                    this.collect(msg);
                    break;
                case ES_RECREATE:
                    this.recreateVideoIndex();
                    break;
                case ES_DOC_ADD_OR_UPDATE:
                    this.addOrUpdateEsVideo(msg);
                    break;
                case ES_DOC_UPDATE_SOME_FIELD:
                    this.updateEsVideoSomeField(msg);
                    break;
                case ES_DOC_DEL:
                    this.delEsVideo(msg);
                default:
                    log.error("video msg not match correct tag, consumer err");
                    break;
@@ -66,4 +83,41 @@
        videoService.mqCollectChange(collect);
    }
    /**
     * 重建视频索引
     *
     */
    public void recreateVideoIndex() {
        esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json");
    }
    /**
     * 新增es视频数据/更新
     *
     * @param msg
     */
    public void addOrUpdateEsVideo(String msg) {
        VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class);
        esService.addOrUpdateDocument(videoIndex);
    }
    /**
     * 更新es视频的某些字段
     *
     * @param msg
     */
    public void updateEsVideoSomeField(String msg) {
        VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class);
        esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields());
    }
    /**
     * 根据id删除es中的视频
     *
     * @param id
     */
    public void delEsVideo(String id) {
        esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id);
    }
}
framework/src/main/java/cn/lili/modules/lmk/domain/dto/VideoEsUpdateDTO.java
New file
@@ -0,0 +1,24 @@
package cn.lili.modules.lmk.domain.dto;
import lombok.Data;
import java.util.Map;
/**
 * @author:xp
 * @date:2025/7/1 10:01
 */
@Data
public class VideoEsUpdateDTO {
    /**
     * 视频id
     */
    private String id;
    /**
     * 修改哪些字段
     */
    private Map<String, Object> fields;
}
framework/src/main/java/cn/lili/modules/lmk/mapper/VideoMapper.java
@@ -136,8 +136,8 @@
     * es同步查询视频数据
     *
     * @param start 开始位置
     * @param end 结束位置
     * @param pageSize 每页条数
     * @return
     */
    List<VideoIndex> getEsPage(@Param("start") int start, @Param("end") int end);
    List<VideoIndex> getEsPage(@Param("start") int start, @Param("pageSize") int pageSize);
}
framework/src/main/java/cn/lili/modules/lmk/service/EsService.java
@@ -37,11 +37,9 @@
    /**
     * 添加/修改 文档,如果是修改,则是整条数据更新
     *
     * @param indexName 索引名称
     * @param id es主键,可传业务主键
     * @param data 数据对象
     */
    void addOrUpdateDocument(String indexName, String id, Object data);
    void addOrUpdateDocument(Object data);
    /**
     * 更新某些字段的值
framework/src/main/java/cn/lili/modules/lmk/service/VideoService.java
@@ -22,20 +22,6 @@
public interface VideoService extends IService<Video> {
    /**
     * 添加
     * @param form
     * @return
     */
    Result add(WxVideoForm form);
    /**
     * 修改
     * @param form
     * @return
     */
    Result update(WxVideoForm form);
    /**
     * 批量删除
     * @param ids
     * @return
@@ -265,4 +251,11 @@
     * @param collect
     */
    void mqCollectChange(MyCollect collect);
    /**
     * 重建视频es索引
     *
     * @return
     */
    Result recreateEsIndex();
}
framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java
@@ -108,7 +108,6 @@
        }
        // 2. 多线程查询视频数据,构建文档对象
        Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper)
                .eq(Video::getStatus, VideoStatusEnum.PUBLISHED.getValue())
                .count();
        int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程
        CountDownLatch latch = new CountDownLatch(totalThreads);
@@ -155,15 +154,17 @@
    }
    @Override
    public void addOrUpdateDocument(String indexName, String id, Object data) {
        indexName = this.getIndexFullName(indexName);
        IndexRequest request = new IndexRequest(indexName);
        request.id(id).source(data);
        try {
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("es文档添加/修改失败", e);
        }
    public void addOrUpdateDocument(Object data) {
        VideoIndex videoIndex = (VideoIndex) data;
        esVideoIndexRepository.save(videoIndex);
//        indexName = this.getIndexFullName(indexName);
//        IndexRequest request = new IndexRequest(indexName);
//        request.id(id).source(data);
//        try {
//            client.index(request, RequestOptions.DEFAULT);
//        } catch (IOException e) {
//            throw new RuntimeException("es文档添加/修改失败", e);
//        }
    }
    @Override
framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoServiceImpl.java
@@ -2,9 +2,11 @@
import cn.lili.cache.Cache;
import cn.lili.cache.CachePrefix;
import cn.lili.common.properties.RocketmqCustomProperties;
import cn.lili.common.security.context.UserContext;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.modules.lmk.constant.RedisKeyExpireConstant;
import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO;
import cn.lili.modules.lmk.domain.entity.*;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.domain.form.*;
@@ -16,6 +18,9 @@
import cn.lili.modules.member.entity.dos.Member;
import cn.lili.modules.member.service.FootprintService;
import cn.lili.modules.member.service.MemberService;
import cn.lili.rocketmq.RocketmqSendCallbackBuilder;
import cn.lili.rocketmq.tags.CommentTagsEnum;
import cn.lili.rocketmq.tags.VideoTagsEnum;
import cn.lili.utils.COSUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -28,6 +33,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
@@ -65,37 +71,9 @@
    private final KitchenTypeService kitchenTypeService;
    private final Cache cache;
    @Qualifier("videoEsServiceImpl")
    private final EsService videoEsService;
    private final RocketmqCustomProperties rocketmqCustomProperties;
    private final RocketMQTemplate rocketMQTemplate;
    /**
     * 添加
     * @param form
     * @return
     */
    @Override
    public Result add(WxVideoForm form) {
        Video entity = WxVideoForm.getEntityByForm(form, null);
        baseMapper.insert(entity);
        return Result.ok("添加成功");
    }
    /**
     * 修改
     * @param form
     * @return
     */
    @Override
    public Result update(WxVideoForm form) {
        Video entity = baseMapper.selectById(form.getId());
        // 为空抛IllegalArgumentException,做全局异常处理
        Assert.notNull(entity, "记录不存在");
        BeanUtils.copyProperties(form, entity);
        baseMapper.updateById(entity);
        return Result.ok("修改成功");
    }
    /**
     * 批量删除
@@ -123,7 +101,9 @@
        new LambdaUpdateChainWrapper<>(videoTagRefService.getBaseMapper())
                .eq(VideoTagRef::getVideoId, id)
                .remove();
        videoEsService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id);
        // mq异步删除es数据
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_DEL.name();
        rocketMQTemplate.asyncSend(destination, id, RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("删除成功");
    }
@@ -185,6 +165,7 @@
        video.setStatus(VideoStatusEnum.AUDITING.getValue());
        video.setCoverUrl(form.getCover());
        video.setVideoType(VideoTypeEnum.VIDEO.getValue());
        video.setRecommend(Boolean.FALSE);
        if (VideoContentTypeEnum.IMG.getValue().equals(form.getVideoContentType())) {
            video.setVideoImgs(JSON.toJSONString(form.getVideoImgs()));
        }
@@ -234,7 +215,7 @@
            }
            videoGoodsService.saveBatch(videoGoods);
        }
        // 5. 构建es中数据
        // 5. 构建es中数据,mq异步处理
        VideoIndex videoIndex = new VideoIndex();
        BeanUtils.copyProperties(video, videoIndex);
        videoIndex.setCoverFileKey(video.getCoverUrl());
@@ -245,7 +226,8 @@
        }).collect(Collectors.toList());
        videoIndex.setGoodsList(esGoodsList);
        videoIndex.setTagList(esTagList);
        videoEsService.addOrUpdateDocument(EsSuffix.VIDEO_INDEX_NAME, video.getId(), videoIndex);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("发布成功,视频审核中~");
    }
@@ -316,7 +298,7 @@
            }
            videoGoodsService.saveBatch(videoGoods);
        }
        // 5. 更新es中的数据
        // 5. 更新es中的数据,mq异步处理
        VideoIndex videoIndex = new VideoIndex();
        BeanUtils.copyProperties(video, videoIndex);
        videoIndex.setCoverFileKey(video.getCoverUrl());
@@ -327,7 +309,8 @@
        }).collect(Collectors.toList());
        videoIndex.setGoodsList(esGoodsList);
        videoIndex.setTagList(esTagList);
        videoEsService.addOrUpdateDocument(EsSuffix.VIDEO_INDEX_NAME, video.getId(), videoIndex);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_ADD_OR_UPDATE.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(videoIndex), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("发布成功,视频审核中~");
    }
@@ -356,9 +339,15 @@
                .eq(Video::getId, form.getId())
                .set(Video::getRecommend, form.getRecommend())
                .update();
        // mq异步更新es
        Map<String, Object> fields = new HashMap<>(2);
        fields.put("recommend", form.getRecommend());
        videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, form.getId(), fields);
        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
        dto.setId(form.getId());
        dto.setFields(fields);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("设置成功");
    }
@@ -378,17 +367,24 @@
        }
        videoAuditRecordService.save(auditRecord);
        // 2. 修改视频状态
        Map<String, Object> fields = new HashMap<>(2);
        if (form.getResult()) {
            video.setStatus(VideoStatusEnum.PUBLISHED.getValue());
            video.setAuditPassTime(new Date());
            Map<String, Object> fields = new HashMap<>(2);
            fields.put("status", VideoStatusEnum.PUBLISHED.getValue());
            videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, video.getId(), fields);
        } else {
            video.setStatus(VideoStatusEnum.REJECT.getValue());
            fields.put("status", VideoStatusEnum.REJECT.getValue());
        }
        baseMapper.updateById(video);
        // 3. mq异步更新es
        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
        dto.setId(video.getId());
        dto.setFields(fields);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok();
    }
@@ -400,10 +396,14 @@
                .eq(Video::getId, id)
                .set(Video::getStatus, VideoStatusEnum.PUBLISHED.getValue())
                .update();
        // 2. 更新es
        // 2. mq异步更新es
        Map<String, Object> fields = new HashMap<>(2);
        fields.put("status", VideoStatusEnum.PUBLISHED.getValue());
        videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, id, fields);
        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
        dto.setId(id);
        dto.setFields(fields);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("上架成功");
    }
@@ -415,10 +415,14 @@
                .eq(Video::getId, form.getId())
                .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue())
                .update();
        // 2. 更新es
        // 2. mq异步更新es
        Map<String, Object> fields = new HashMap<>(2);
        fields.put("status", VideoStatusEnum.DISABLE.getValue());
        videoEsService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, form.getId(), fields);
        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
        dto.setId(form.getId());
        dto.setFields(fields);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
        // TODO 将下架原因以通知的方式告知用户
        return Result.ok("下架成功");
@@ -430,6 +434,14 @@
                .eq(Video::getId, id)
                .set(Video::getStatus, VideoStatusEnum.DISABLE.getValue())
                .update();
        // 2. mq异步更新es
        Map<String, Object> fields = new HashMap<>(2);
        fields.put("status", VideoStatusEnum.DISABLE.getValue());
        VideoEsUpdateDTO dto = new VideoEsUpdateDTO();
        dto.setId(id);
        dto.setFields(fields);
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_DOC_UPDATE_SOME_FIELD.name();
        rocketMQTemplate.asyncSend(destination, JSON.toJSONString(dto), RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("下架成功");
    }
@@ -883,4 +895,12 @@
                    .update();
        }
    }
    @Override
    public Result recreateEsIndex() {
        String destination = rocketmqCustomProperties.getVideoTopic() + ":" + VideoTagsEnum.ES_RECREATE.name();
        // 消息体不能为空,随便传一个1
        rocketMQTemplate.asyncSend(destination, "1", RocketmqSendCallbackBuilder.commonCallback());
        return Result.ok("已成功发起构建请求,稍作等待后便会自动完成");
    }
}
framework/src/main/java/cn/lili/rocketmq/tags/VideoTagsEnum.java
@@ -12,6 +12,10 @@
     * 收藏
     */
    COLLECT("收藏"),
    ES_RECREATE("重建视频索引"),
    ES_DOC_ADD_OR_UPDATE("新增或全量修改视频"),
    ES_DOC_UPDATE_SOME_FIELD("修改视频某些字段"),
    ES_DOC_DEL("删除视频"),
    ;
framework/src/main/resources/mapper/lmk/VideoMapper.xml
@@ -605,8 +605,8 @@
            lmk_video LV
                LEFT JOIN li_member LM ON LV.author_id = LM.id
        WHERE
            LV.delete_flag = 0 AND LV.status = '1'
        LIMIT #{start}, #{end}
            LV.delete_flag = 0
        LIMIT #{start}, #{pageSize}
    </select>
</mapper>
manager-api/src/main/java/cn/lili/controller/lmk/VideoController.java
@@ -38,18 +38,6 @@
    @Qualifier("videoEsServiceImpl")
    private final EsService esService;
    @PostMapping
    @ApiOperation(value = "添加", notes = "添加")
    public Result add(@RequestBody @Validated(Add.class) WxVideoForm form) {
        return videoService.add(form);
    }
    @PutMapping
    @ApiOperation(value = "修改", notes = "修改")
    public Result update(@RequestBody @Validated(Update.class) WxVideoForm form) {
        return videoService.update(form);
    }
    @DeleteMapping("/{id}")
    @ApiOperation(value = "ID删除", notes = "ID删除")
    public Result removeById(@PathVariable("id") String id) {
@@ -107,7 +95,6 @@
    @PostMapping("/recreate/es/index")
    @ApiOperation(value = "重建es索引", notes = "重建es索引")
    public Result recreateEsIndex() {
        esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json");
        return Result.ok();
        return videoService.recreateEsIndex();
    }
}