| | |
| | | package cn.lili.listener; |
| | | |
| | | import cn.lili.cache.Cache; |
| | | import cn.lili.elasticsearch.EsSuffix; |
| | | import cn.lili.modules.lmk.domain.dto.VideoEsUpdateDTO; |
| | | import cn.lili.modules.lmk.domain.entity.MyCollect; |
| | | import cn.lili.modules.lmk.domain.entity.ThumbsUpRecord; |
| | | import cn.lili.modules.lmk.domain.es.VideoIndex; |
| | | import cn.lili.modules.lmk.domain.form.ThumbsUpRecordForm; |
| | | import cn.lili.modules.lmk.service.EsService; |
| | | import cn.lili.modules.lmk.service.ThumbsUpRecordService; |
| | | import cn.lili.modules.lmk.service.VideoCommentService; |
| | | import cn.lili.modules.lmk.service.VideoService; |
| | |
| | | import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
| | | import org.apache.rocketmq.spring.core.RocketMQListener; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | |
| | | private VideoService videoService; |
| | | |
| | | @Autowired |
| | | @Qualifier("videoEsServiceImpl") |
| | | private EsService esService; |
| | | |
| | | @Autowired |
| | | private Cache<Object> cache; |
| | | |
| | | @Override |
| | | public void onMessage(MessageExt messageExt) { |
| | | try { |
| | | String msg = new String(messageExt.getBody()); |
| | | if (StringUtils.isBlank(msg)) { |
| | | log.error("video msg is null, cant not consumer"); |
| | | return; |
| | | } |
| | | |
| | | switch (VideoTagsEnum.valueOf(messageExt.getTags())) { |
| | | case COLLECT: |
| | | this.collect(msg); |
| | | break; |
| | | case THUMBS_UP: |
| | | this.changeThumbsUp(msg); |
| | | break; |
| | | case ES_RECREATE: |
| | | this.recreateVideoIndex(); |
| | | break; |
| | | case ES_DOC_ADD_OR_UPDATE: |
| | | this.addOrUpdateEsVideo(msg); |
| | | break; |
| | | case ES_DOC_UPDATE_SOME_FIELD: |
| | | this.updateEsVideoSomeField(msg); |
| | | break; |
| | | case ES_DOC_DEL: |
| | | this.delEsVideo(msg); |
| | | default: |
| | | log.error("video msg not match correct tag, consumer err"); |
| | | break; |
| | |
| | | videoService.mqCollectChange(collect); |
| | | } |
| | | |
| | | /** |
| | | * 重建视频索引 |
| | | * |
| | | */ |
| | | public void recreateVideoIndex() { |
| | | esService.recreateIndex(EsSuffix.VIDEO_INDEX_NAME, "/es/video.json"); |
| | | } |
| | | |
| | | /** |
| | | * 新增es视频数据/更新 |
| | | * |
| | | * @param msg |
| | | */ |
| | | public void addOrUpdateEsVideo(String msg) { |
| | | VideoIndex videoIndex = JSON.parseObject(msg, VideoIndex.class); |
| | | esService.addOrUpdateDocument(videoIndex); |
| | | } |
| | | |
| | | /** |
| | | * 更新es视频的某些字段 |
| | | * |
| | | * @param msg |
| | | */ |
| | | public void updateEsVideoSomeField(String msg) { |
| | | VideoEsUpdateDTO dto = JSON.parseObject(msg, VideoEsUpdateDTO.class); |
| | | esService.updateSomeField(EsSuffix.VIDEO_INDEX_NAME, dto.getId(), dto.getFields()); |
| | | } |
| | | |
| | | /** |
| | | * 根据id删除es中的视频 |
| | | * |
| | | * @param id |
| | | */ |
| | | public void delEsVideo(String id) { |
| | | esService.deleteDocument(EsSuffix.VIDEO_INDEX_NAME, id); |
| | | } |
| | | |
| | | public void changeThumbsUp(String msg) { |
| | | ThumbsUpRecord thumbsUpRecord = JSON.parseObject(msg, ThumbsUpRecord.class); |
| | | videoService.mqChangeThumbsUp(thumbsUpRecord); |
| | | } |
| | | |
| | | } |