peng
4 天以前 aac2321d1cf5536f7ea03f30d55a4aba30fbf710
framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java
New file
@@ -0,0 +1,214 @@
package cn.lili.modules.lmk.service.impl;
import cn.lili.elasticsearch.BaseElasticsearchService;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.elasticsearch.config.ElasticsearchProperties;
import cn.lili.modules.lmk.domain.entity.Video;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.enums.general.VideoStatusEnum;
import cn.lili.modules.lmk.mapper.VideoMapper;
import cn.lili.modules.lmk.service.EsService;
import cn.lili.modules.search.repository.EsVideoIndexRepository;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
 * 视频es
 *
 * @author:xp
 * @date:2025/6/30 15:54
 */
@Slf4j
@RequiredArgsConstructor
@Service("videoEsServiceImpl")
public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService {
    private final ElasticsearchProperties elasticsearchProperties;
    private final VideoMapper videoMapper;
    private final EsVideoIndexRepository esVideoIndexRepository;
    @Override
    public String getIndexFullName(String indexName) {
        return elasticsearchProperties.getIndexPrefix() + "_" + indexName;
    }
    @Override
    public void createIndex(String indexName, String mappingJsonPath) {
        if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
            indexName = this.getIndexFullName(indexName);
        }
        if (this.indexExist(indexName)) {
            throw new RuntimeException(String.format("索引:%s已经存在,无法创建", indexName));
        }
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        // 1. 配置索引
        request.settings(Settings.builder()
                .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards())
                .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas())
                .put("index.max_result_window", 100000) //最大查询结果数
                .put("index.mapping.total_fields.limit", 2000));
        // 2. 配置mapping
        String mapping;
        try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) {
            byte[] bytes = FileCopyUtils.copyToByteArray(inputStream);
            mapping = new String(bytes, StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new RuntimeException(String.format("读取es映射json文件:%s异常", mappingJsonPath), e);
        }
        request.mapping(mapping, XContentType.JSON);
        // 3. 创建索引
        try {
            CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS);
        } catch (IOException e) {
            throw new RuntimeException(String.format("es创建索引失败:%s", indexName), e);
        }
    }
    @Override
    public void recreateIndex(String indexName, String mappingJsonPath) {
        indexName = this.getIndexFullName(indexName);
        // 1. 如果索引存在,先删除索引,再创建索引
        if (this.indexExist(indexName)) {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            try {
                AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
                this.createIndex(indexName, mappingJsonPath);
            } catch (IOException e) {
                log.error("删除索引失败", e);
                throw new RuntimeException("删除索引失败");
            }
        } else {
            this.createIndex(indexName, mappingJsonPath);
        }
        // 2. 多线程查询视频数据,构建文档对象
        Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper)
                .count();
        int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程
        CountDownLatch latch = new CountDownLatch(totalThreads);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                4,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>();
        for (int page = 0; page < totalThreads; page++) {
            final int currentPage = page;
            threadPoolExecutor.execute(() -> {
                try {
                    List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200);
                    dataList.addAll(pageData);
                } catch (Exception e) {
                    log.error("第{}页数据查询失败", currentPage, e);
                } finally {
                    latch.countDown(); // 线程执行完成 -1
                }
            });
        }
        try {
            latch.await(); // 等待所有线程执行完成
            // 3. 添加es数据
//            BulkRequest bulkRequest = new BulkRequest();
//            String finalIndexName = indexName;
//            dataList.forEach(data -> {
//                IndexRequest indexRequest = new IndexRequest(finalIndexName)
//                        .id(data.getId())
//                        .source(data);
//                bulkRequest.add(indexRequest);
//            });
//            client.bulk(bulkRequest, RequestOptions.DEFAULT);
            esVideoIndexRepository.saveAll(dataList);
        } catch (InterruptedException e) {
            log.error("多线程读取视频数据异常", e);
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
    @Override
    public void addOrUpdateDocument(Object data) {
        VideoIndex videoIndex = (VideoIndex) data;
        esVideoIndexRepository.save(videoIndex);
//        indexName = this.getIndexFullName(indexName);
//        IndexRequest request = new IndexRequest(indexName);
//        request.id(id).source(data);
//        try {
//            client.index(request, RequestOptions.DEFAULT);
//        } catch (IOException e) {
//            throw new RuntimeException("es文档添加/修改失败", e);
//        }
    }
    @Override
    public void updateSomeField(String indexName, String id, Map<String, Object> updateList) {
        indexName = this.getIndexFullName(indexName);
        // 构建更新请求
        UpdateRequest request = new UpdateRequest(indexName, id);
        try {
            // 构建更新内容
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
            for (Map.Entry<String, Object> entry : updateList.entrySet()) {
                builder.field(entry.getKey(), entry.getValue());
            }
            builder.endObject();
            request.doc(builder); // 设置部分更新内容
            // 可选配置
            request.retryOnConflict(2); // 冲突重试次数
//            request.fetchSource(true);   // 返回更新后的文档
            client.update(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void deleteDocument(String indexName, String id) {
        indexName = this.getIndexFullName(indexName);
        DeleteRequest request = new DeleteRequest(indexName, id);
        try {
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("删除es文档失败:" + id, e);
        }
    }
    @Override
    public boolean indexExist(String indexName) {
        if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
            indexName = this.getIndexFullName(indexName);
        }
        return super.indexExist(indexName);
    }
}