New file |
| | |
| | | package cn.lili.modules.lmk.service.impl; |
| | | |
| | | import cn.lili.elasticsearch.BaseElasticsearchService; |
| | | import cn.lili.elasticsearch.EsSuffix; |
| | | import cn.lili.elasticsearch.config.ElasticsearchProperties; |
| | | import cn.lili.modules.lmk.domain.entity.Video; |
| | | import cn.lili.modules.lmk.domain.es.VideoIndex; |
| | | import cn.lili.modules.lmk.enums.general.VideoStatusEnum; |
| | | import cn.lili.modules.lmk.mapper.VideoMapper; |
| | | import cn.lili.modules.lmk.service.EsService; |
| | | import cn.lili.modules.search.repository.EsVideoIndexRepository; |
| | | import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; |
| | | import lombok.RequiredArgsConstructor; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; |
| | | import org.elasticsearch.action.bulk.BulkRequest; |
| | | import org.elasticsearch.action.delete.DeleteRequest; |
| | | import org.elasticsearch.action.index.IndexRequest; |
| | | import org.elasticsearch.action.support.master.AcknowledgedResponse; |
| | | import org.elasticsearch.action.update.UpdateRequest; |
| | | import org.elasticsearch.client.RequestOptions; |
| | | import org.elasticsearch.client.indices.CreateIndexRequest; |
| | | import org.elasticsearch.client.indices.CreateIndexResponse; |
| | | import org.elasticsearch.common.settings.Settings; |
| | | import org.elasticsearch.common.xcontent.XContentBuilder; |
| | | import org.elasticsearch.common.xcontent.XContentFactory; |
| | | import org.elasticsearch.common.xcontent.XContentType; |
| | | import org.springframework.data.elasticsearch.core.document.Document; |
| | | import org.springframework.data.elasticsearch.core.query.UpdateQuery; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.util.FileCopyUtils; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.nio.charset.StandardCharsets; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.*; |
| | | |
| | | /** |
| | | * 视频es |
| | | * |
| | | * @author:xp |
| | | * @date:2025/6/30 15:54 |
| | | */ |
| | | @Slf4j |
| | | @RequiredArgsConstructor |
| | | @Service("videoEsServiceImpl") |
| | | public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService { |
| | | |
| | | private final ElasticsearchProperties elasticsearchProperties; |
| | | private final VideoMapper videoMapper; |
| | | private final EsVideoIndexRepository esVideoIndexRepository; |
| | | |
| | | @Override |
| | | public String getIndexFullName(String indexName) { |
| | | return elasticsearchProperties.getIndexPrefix() + "_" + indexName; |
| | | } |
| | | |
| | | @Override |
| | | public void createIndex(String indexName, String mappingJsonPath) { |
| | | if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { |
| | | indexName = this.getIndexFullName(indexName); |
| | | } |
| | | if (this.indexExist(indexName)) { |
| | | throw new RuntimeException(String.format("索引:%s已经存在,无法创建", indexName)); |
| | | } |
| | | CreateIndexRequest request = new CreateIndexRequest(indexName); |
| | | |
| | | // 1. 配置索引 |
| | | request.settings(Settings.builder() |
| | | .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards()) |
| | | .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas()) |
| | | .put("index.max_result_window", 100000) //最大查询结果数 |
| | | .put("index.mapping.total_fields.limit", 2000)); |
| | | // 2. 配置mapping |
| | | String mapping; |
| | | try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) { |
| | | byte[] bytes = FileCopyUtils.copyToByteArray(inputStream); |
| | | mapping = new String(bytes, StandardCharsets.UTF_8); |
| | | } catch (IOException e) { |
| | | throw new RuntimeException(String.format("读取es映射json文件:%s异常", mappingJsonPath), e); |
| | | } |
| | | request.mapping(mapping, XContentType.JSON); |
| | | // 3. 创建索引 |
| | | try { |
| | | CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS); |
| | | } catch (IOException e) { |
| | | throw new RuntimeException(String.format("es创建索引失败:%s", indexName), e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void recreateIndex(String indexName, String mappingJsonPath) { |
| | | indexName = this.getIndexFullName(indexName); |
| | | // 1. 如果索引存在,先删除索引,再创建索引 |
| | | if (this.indexExist(indexName)) { |
| | | DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); |
| | | try { |
| | | AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); |
| | | this.createIndex(indexName, mappingJsonPath); |
| | | } catch (IOException e) { |
| | | log.error("删除索引失败", e); |
| | | throw new RuntimeException("删除索引失败"); |
| | | } |
| | | } else { |
| | | this.createIndex(indexName, mappingJsonPath); |
| | | } |
| | | // 2. 多线程查询视频数据,构建文档对象 |
| | | Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper) |
| | | .count(); |
| | | int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程 |
| | | CountDownLatch latch = new CountDownLatch(totalThreads); |
| | | ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( |
| | | 4, |
| | | 10, |
| | | 10, |
| | | TimeUnit.SECONDS, |
| | | new ArrayBlockingQueue<>(4), |
| | | Executors.defaultThreadFactory(), |
| | | new ThreadPoolExecutor.AbortPolicy()); |
| | | BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>(); |
| | | for (int page = 0; page < totalThreads; page++) { |
| | | final int currentPage = page; |
| | | threadPoolExecutor.execute(() -> { |
| | | try { |
| | | List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200); |
| | | dataList.addAll(pageData); |
| | | } catch (Exception e) { |
| | | log.error("第{}页数据查询失败", currentPage, e); |
| | | } finally { |
| | | latch.countDown(); // 线程执行完成 -1 |
| | | } |
| | | }); |
| | | } |
| | | try { |
| | | latch.await(); // 等待所有线程执行完成 |
| | | // 3. 添加es数据 |
| | | // BulkRequest bulkRequest = new BulkRequest(); |
| | | // String finalIndexName = indexName; |
| | | // dataList.forEach(data -> { |
| | | // IndexRequest indexRequest = new IndexRequest(finalIndexName) |
| | | // .id(data.getId()) |
| | | // .source(data); |
| | | // bulkRequest.add(indexRequest); |
| | | // }); |
| | | // client.bulk(bulkRequest, RequestOptions.DEFAULT); |
| | | esVideoIndexRepository.saveAll(dataList); |
| | | } catch (InterruptedException e) { |
| | | log.error("多线程读取视频数据异常", e); |
| | | } finally { |
| | | threadPoolExecutor.shutdown(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void addOrUpdateDocument(Object data) { |
| | | VideoIndex videoIndex = (VideoIndex) data; |
| | | esVideoIndexRepository.save(videoIndex); |
| | | // indexName = this.getIndexFullName(indexName); |
| | | // IndexRequest request = new IndexRequest(indexName); |
| | | // request.id(id).source(data); |
| | | // try { |
| | | // client.index(request, RequestOptions.DEFAULT); |
| | | // } catch (IOException e) { |
| | | // throw new RuntimeException("es文档添加/修改失败", e); |
| | | // } |
| | | } |
| | | |
| | | @Override |
| | | public void updateSomeField(String indexName, String id, Map<String, Object> updateList) { |
| | | indexName = this.getIndexFullName(indexName); |
| | | // 构建更新请求 |
| | | UpdateRequest request = new UpdateRequest(indexName, id); |
| | | |
| | | try { |
| | | // 构建更新内容 |
| | | XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); |
| | | for (Map.Entry<String, Object> entry : updateList.entrySet()) { |
| | | builder.field(entry.getKey(), entry.getValue()); |
| | | } |
| | | builder.endObject(); |
| | | |
| | | request.doc(builder); // 设置部分更新内容 |
| | | |
| | | // 可选配置 |
| | | request.retryOnConflict(2); // 冲突重试次数 |
| | | // request.fetchSource(true); // 返回更新后的文档 |
| | | |
| | | client.update(request, RequestOptions.DEFAULT); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void deleteDocument(String indexName, String id) { |
| | | indexName = this.getIndexFullName(indexName); |
| | | DeleteRequest request = new DeleteRequest(indexName, id); |
| | | try { |
| | | client.delete(request, RequestOptions.DEFAULT); |
| | | } catch (IOException e) { |
| | | throw new RuntimeException("删除es文档失败:" + id, e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean indexExist(String indexName) { |
| | | if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { |
| | | indexName = this.getIndexFullName(indexName); |
| | | } |
| | | return super.indexExist(indexName); |
| | | } |
| | | } |