package cn.lili.modules.lmk.service.impl; import cn.lili.elasticsearch.BaseElasticsearchService; import cn.lili.elasticsearch.EsSuffix; import cn.lili.elasticsearch.config.ElasticsearchProperties; import cn.lili.modules.lmk.domain.entity.Video; import cn.lili.modules.lmk.domain.es.VideoIndex; import cn.lili.modules.lmk.enums.general.VideoStatusEnum; import cn.lili.modules.lmk.mapper.VideoMapper; import cn.lili.modules.lmk.service.EsService; import cn.lili.modules.search.repository.EsVideoIndexRepository; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.stereotype.Service; import org.springframework.util.FileCopyUtils; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 视频es * * @author:xp * @date:2025/6/30 15:54 */ @Slf4j @RequiredArgsConstructor @Service("videoEsServiceImpl") public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService { private final ElasticsearchProperties elasticsearchProperties; private final VideoMapper videoMapper; private final EsVideoIndexRepository esVideoIndexRepository; @Override public String getIndexFullName(String indexName) { return elasticsearchProperties.getIndexPrefix() + "_" + indexName; } @Override public void createIndex(String indexName, String mappingJsonPath) { if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { indexName = this.getIndexFullName(indexName); } if (this.indexExist(indexName)) { throw new RuntimeException(String.format("索引:%s已经存在,无法创建", indexName)); } CreateIndexRequest request = new CreateIndexRequest(indexName); // 1. 配置索引 request.settings(Settings.builder() .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards()) .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas()) .put("index.max_result_window", 100000) //最大查询结果数 .put("index.mapping.total_fields.limit", 2000)); // 2. 配置mapping String mapping; try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) { byte[] bytes = FileCopyUtils.copyToByteArray(inputStream); mapping = new String(bytes, StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(String.format("读取es映射json文件:%s异常", mappingJsonPath), e); } request.mapping(mapping, XContentType.JSON); // 3. 创建索引 try { CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS); } catch (IOException e) { throw new RuntimeException(String.format("es创建索引失败:%s", indexName), e); } } @Override public void recreateIndex(String indexName, String mappingJsonPath) { indexName = this.getIndexFullName(indexName); // 1. 如果索引存在,先删除索引,再创建索引 if (this.indexExist(indexName)) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); try { AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); this.createIndex(indexName, mappingJsonPath); } catch (IOException e) { log.error("删除索引失败", e); throw new RuntimeException("删除索引失败"); } } else { this.createIndex(indexName, mappingJsonPath); } // 2. 多线程查询视频数据,构建文档对象 Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper) .count(); int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程 CountDownLatch latch = new CountDownLatch(totalThreads); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 4, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); BlockingQueue dataList = new LinkedBlockingQueue<>(); for (int page = 0; page < totalThreads; page++) { final int currentPage = page; threadPoolExecutor.execute(() -> { try { List pageData = videoMapper.getEsPage(currentPage * 200, 200); dataList.addAll(pageData); } catch (Exception e) { log.error("第{}页数据查询失败", currentPage, e); } finally { latch.countDown(); // 线程执行完成 -1 } }); } try { latch.await(); // 等待所有线程执行完成 // 3. 添加es数据 // BulkRequest bulkRequest = new BulkRequest(); // String finalIndexName = indexName; // dataList.forEach(data -> { // IndexRequest indexRequest = new IndexRequest(finalIndexName) // .id(data.getId()) // .source(data); // bulkRequest.add(indexRequest); // }); // client.bulk(bulkRequest, RequestOptions.DEFAULT); esVideoIndexRepository.saveAll(dataList); } catch (InterruptedException e) { log.error("多线程读取视频数据异常", e); } finally { threadPoolExecutor.shutdown(); } } @Override public void addOrUpdateDocument(Object data) { VideoIndex videoIndex = (VideoIndex) data; esVideoIndexRepository.save(videoIndex); // indexName = this.getIndexFullName(indexName); // IndexRequest request = new IndexRequest(indexName); // request.id(id).source(data); // try { // client.index(request, RequestOptions.DEFAULT); // } catch (IOException e) { // throw new RuntimeException("es文档添加/修改失败", e); // } } @Override public void updateSomeField(String indexName, String id, Map updateList) { indexName = this.getIndexFullName(indexName); // 构建更新请求 UpdateRequest request = new UpdateRequest(indexName, id); try { // 构建更新内容 XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); for (Map.Entry entry : updateList.entrySet()) { builder.field(entry.getKey(), entry.getValue()); } builder.endObject(); request.doc(builder); // 设置部分更新内容 // 可选配置 request.retryOnConflict(2); // 冲突重试次数 // request.fetchSource(true); // 返回更新后的文档 client.update(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } @Override public void deleteDocument(String indexName, String id) { indexName = this.getIndexFullName(indexName); DeleteRequest request = new DeleteRequest(indexName, id); try { client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("删除es文档失败:" + id, e); } } @Override public boolean indexExist(String indexName) { if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { indexName = this.getIndexFullName(indexName); } return super.indexExist(indexName); } }