peng
1 天以前 aac2321d1cf5536f7ea03f30d55a4aba30fbf710
consumer/src/main/java/cn/lili/listener/VideoMessageListener.java
@@ -1,9 +1,13 @@
package cn.lili.listener;
import cn.lili.cache.Cache;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO;
import cn.lili.modules.lmk.domain.entity.MyCollect;
import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm;
import cn.lili.modules.lmk.service.EsService;
import cn.lili.modules.lmk.service.ThumbsUpRecordService;
import cn.lili.modules.lmk.service.VideoCommentService;
import cn.lili.modules.lmk.service.VideoService;
@@ -16,6 +20,7 @@
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
@@ -33,20 +38,35 @@
    private VideoService videoService;
    @Autowired
    @Qualifier("videoEsServiceImpl")
    private EsService esService;
    @Autowired
    private Cache<Object> cache;
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            String msg = new String(messageExt.getBody());
            if (StringUtils.isBlank(msg)) {
                log.error("video msg is null, cant not consumer");
                return;
            }
            switch (VideoTagsEnum.valueOf(messageExt.getTags())) {
                case COLLECT:
                    this.collect(msg);
                    break;
                case THUMBS_UP:
                    this.changeThumbsUp(msg);
                    break;
                case ES_RECREATE:
                    this.recreateVideoIndex();
                    break;
                case ES_DOC_ADD_OR_UPDATE:
                    this.addOrUpdateEsVideo(msg);
                    break;
                case ES_DOC_UPDATE_SOME_FIELD:
                    this.updateEsVideoSomeField(msg);
                    break;
                case ES_DOC_DEL:
                    this.delEsVideo(msg);
                default:
                    log.error("video msg not match correct tag, consumer err");
                    break;
@@ -66,4 +86,46 @@
        videoService.mqCollectChange(collect);
    }
    /**
     * 重建视频索引
     *
     */
    public void recreateVideoIndex() {
        esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json");
    }
    /**
     * 新增es视频数据/更新
     *
     * @param msg
     */
    public void addOrUpdateEsVideo(String msg) {
        VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class);
        esService.addOrUpdateDocument(videoIndex);
    }
    /**
     * 更新es视频的某些字段
     *
     * @param msg
     */
    public void updateEsVideoSomeField(String msg) {
        VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class);
        esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields());
    }
    /**
     * 根据id删除es中的视频
     *
     * @param id
     */
    public void delEsVideo(String id) {
        esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id);
    }
    public void changeThumbsUp(String msg) {
        ThumbsUpRecord thumbsUpRecord = JSON.parseObject(msg, ThumbsUpRecord.class);
        videoService.mqChangeThumbsUp(thumbsUpRecord);
    }
}