package cn.lili.listener; import cn.lili.cache.Cache; import cn.lili.elasticsearch.EsSuffix; import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; import cn.lili.modules.lmk.domain.entity.MyCollect; import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord; import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm; import cn.lili.modules.lmk.service.EsService; import cn.lili.modules.lmk.service.ThumbsUpRecordService; import cn.lili.modules.lmk.service.VideoCommentService; import cn.lili.modules.lmk.service.VideoService; import cn.lili.rocketmq.tags.CommentTagsEnum; import cn.lili.rocketmq.tags.VideoTagsEnum; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; /** * 评论消息消费者 * * @author paulG * @since 2020/12/9 **/ @Component @Slf4j @RocketMQMessageListener(topic = "${lili.data.rocketmq.video-topic}", consumerGroup = "${lili.data.rocketmq.video-group}") public class VideoMessageListener implements RocketMQListener { @Autowired private VideoService videoService; @Autowired @Qualifier("videoEsServiceImpl") private EsService esService; @Autowired private Cache cache; @Override public void onMessage(MessageExt messageExt) { try { String msg = new String(messageExt.getBody()); switch (VideoTagsEnum.valueOf(messageExt.getTags())) { case COLLECT: this.collect(msg); break; case THUMBS_UP: this.changeThumbsUp(msg); break; case ES_RECREATE: this.recreateVideoIndex(); break; case ES_DOC_ADD_OR_UPDATE: this.addOrUpdateEsVideo(msg); break; case ES_DOC_UPDATE_SOME_FIELD: this.updateEsVideoSomeField(msg); break; case ES_DOC_DEL: this.delEsVideo(msg); default: log.error("video msg not match correct tag, consumer err"); break; } } catch (Exception e) { log.error("video msg consumer err", e); } } /** * 视频收藏/取消收藏 * * @param msg */ public void collect(String msg) { MyCollect collect = JSON.parseObject(msg, MyCollect.class); videoService.mqCollectChange(collect); } /** * 重建视频索引 * */ public void recreateVideoIndex() { esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json"); } /** * 新增es视频数据/更新 * * @param msg */ public void addOrUpdateEsVideo(String msg) { VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class); esService.addOrUpdateDocument(videoIndex); } /** * 更新es视频的某些字段 * * @param msg */ public void updateEsVideoSomeField(String msg) { VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class); esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields()); } /** * 根据id删除es中的视频 * * @param id */ public void delEsVideo(String id) { esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id); } public void changeThumbsUp(String msg) { ThumbsUpRecord thumbsUpRecord = JSON.parseObject(msg, ThumbsUpRecord.class); videoService.mqChangeThumbsUp(thumbsUpRecord); } }