xiangpei
4 天以前 07a8e96750473fbdf676993191c41155ddf8146e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package cn.lili.listener;
 
import cn.lili.cache.Cache;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO;
import cn.lili.modules.lmk.domain.entity.MyCollect;
import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm;
import cn.lili.modules.lmk.service.EsService;
import cn.lili.modules.lmk.service.ThumbsUpRecordService;
import cn.lili.modules.lmk.service.VideoCommentService;
import cn.lili.modules.lmk.service.VideoService;
import cn.lili.rocketmq.tags.CommentTagsEnum;
import cn.lili.rocketmq.tags.VideoTagsEnum;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
 
/**
 * 评论消息消费者
 *
 * @author paulG
 * @since 2020/12/9
 **/
@Component
@Slf4j
@RocketMQMessageListener(topic = "${lili.data.rocketmq.video-topic}", consumerGroup = "${lili.data.rocketmq.video-group}")
public class VideoMessageListener implements RocketMQListener<MessageExt> {
 
    @Autowired
    private VideoService videoService;
 
    @Autowired
    @Qualifier("videoEsServiceImpl")
    private EsService esService;
 
    @Autowired
    private Cache<Object> cache;
 
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            String msg = new String(messageExt.getBody());
 
            switch (VideoTagsEnum.valueOf(messageExt.getTags())) {
                case COLLECT:
                    this.collect(msg);
                    break;
                case THUMBS_UP:
                    this.changeThumbsUp(msg);
                    break;
                case ES_RECREATE:
                    this.recreateVideoIndex();
                    break;
                case ES_DOC_ADD_OR_UPDATE:
                    this.addOrUpdateEsVideo(msg);
                    break;
                case ES_DOC_UPDATE_SOME_FIELD:
                    this.updateEsVideoSomeField(msg);
                    break;
                case ES_DOC_DEL:
                    this.delEsVideo(msg);
                default:
                    log.error("video msg not match correct tag, consumer err");
                    break;
            }
        } catch (Exception e) {
            log.error("video msg consumer err", e);
        }
    }
 
    /**
     * 视频收藏/取消收藏
     *
     * @param msg
     */
    public void collect(String msg) {
        MyCollect collect = JSON.parseObject(msg, MyCollect.class);
        videoService.mqCollectChange(collect);
    }
 
    /**
     * 重建视频索引
     *
     */
    public void recreateVideoIndex() {
        esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json");
    }
 
    /**
     * 新增es视频数据/更新
     *
     * @param msg
     */
    public void addOrUpdateEsVideo(String msg) {
        VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class);
        esService.addOrUpdateDocument(videoIndex);
    }
 
    /**
     * 更新es视频的某些字段
     *
     * @param msg
     */
    public void updateEsVideoSomeField(String msg) {
        VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class);
        esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields());
    }
 
    /**
     * 根据id删除es中的视频
     *
     * @param id
     */
    public void delEsVideo(String id) {
        esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id);
    }
 
    public void changeThumbsUp(String msg) {
        ThumbsUpRecord thumbsUpRecord = JSON.parseObject(msg, ThumbsUpRecord.class);
        videoService.mqChangeThumbsUp(thumbsUpRecord);
    }
 
}