From aac2321d1cf5536f7ea03f30d55a4aba30fbf710 Mon Sep 17 00:00:00 2001
From: peng <peng.com>
Date: 星期三, 02 七月 2025 10:39:39 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/dev' into dev

---
 framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java |  214 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 214 insertions(+), 0 deletions(-)

diff --git a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java
new file mode 100644
index 0000000..3823ce3
--- /dev/null
+++ b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java
@@ -0,0 +1,214 @@
+package cn.lili.modules.lmk.service.impl;
+
+import cn.lili.elasticsearch.BaseElasticsearchService;
+import cn.lili.elasticsearch.EsSuffix;
+import cn.lili.elasticsearch.config.ElasticsearchProperties;
+import cn.lili.modules.lmk.domain.entity.Video;
+import cn.lili.modules.lmk.domain.es.VideoIndex;
+import cn.lili.modules.lmk.enums.general.VideoStatusEnum;
+import cn.lili.modules.lmk.mapper.VideoMapper;
+import cn.lili.modules.lmk.service.EsService;
+import cn.lili.modules.search.repository.EsVideoIndexRepository;
+import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.springframework.data.elasticsearch.core.document.Document;
+import org.springframework.data.elasticsearch.core.query.UpdateQuery;
+import org.springframework.stereotype.Service;
+import org.springframework.util.FileCopyUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 瑙嗛es
+ *
+ * @author锛歺p
+ * @date锛�2025/6/30 15:54
+ */
+@Slf4j
+@RequiredArgsConstructor
+@Service("videoEsServiceImpl")
+public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService {
+
+    private final ElasticsearchProperties elasticsearchProperties;
+    private final VideoMapper videoMapper;
+    private final EsVideoIndexRepository esVideoIndexRepository;
+
+    @Override
+    public String getIndexFullName(String indexName) {
+        return elasticsearchProperties.getIndexPrefix() + "_" + indexName;
+    }
+
+    @Override
+    public void createIndex(String indexName, String mappingJsonPath) {
+        if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
+            indexName = this.getIndexFullName(indexName);
+        }
+        if (this.indexExist(indexName)) {
+            throw new RuntimeException(String.format("绱㈠紩锛�%s宸茬粡瀛樺湪锛屾棤娉曞垱寤�", indexName));
+        }
+        CreateIndexRequest request = new CreateIndexRequest(indexName);
+
+        // 1. 閰嶇疆绱㈠紩
+        request.settings(Settings.builder()
+                .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards())
+                .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas())
+                .put("index.max_result_window", 100000) //鏈�澶ф煡璇㈢粨鏋滄暟
+                .put("index.mapping.total_fields.limit", 2000));
+        // 2. 閰嶇疆mapping
+        String mapping;
+        try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) {
+            byte[] bytes = FileCopyUtils.copyToByteArray(inputStream);
+            mapping = new String(bytes, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("璇诲彇es鏄犲皠json鏂囦欢锛�%s寮傚父", mappingJsonPath), e);
+        }
+        request.mapping(mapping, XContentType.JSON);
+        // 3. 鍒涘缓绱㈠紩
+        try {
+            CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS);
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("es鍒涘缓绱㈠紩澶辫触锛�%s", indexName), e);
+        }
+    }
+
+    @Override
+    public void recreateIndex(String indexName, String mappingJsonPath) {
+        indexName = this.getIndexFullName(indexName);
+        // 1. 濡傛灉绱㈠紩瀛樺湪锛屽厛鍒犻櫎绱㈠紩锛屽啀鍒涘缓绱㈠紩
+        if (this.indexExist(indexName)) {
+            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
+            try {
+                AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
+                this.createIndex(indexName, mappingJsonPath);
+            } catch (IOException e) {
+                log.error("鍒犻櫎绱㈠紩澶辫触", e);
+                throw new RuntimeException("鍒犻櫎绱㈠紩澶辫触");
+            }
+        } else {
+            this.createIndex(indexName, mappingJsonPath);
+        }
+        // 2. 澶氱嚎绋嬫煡璇㈣棰戞暟鎹紝鏋勫缓鏂囨。瀵硅薄
+        Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper)
+                .count();
+        int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 璁$畻闇�瑕佸灏戜釜绾跨▼
+        CountDownLatch latch = new CountDownLatch(totalThreads);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+                4,
+                10,
+                10,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(4),
+                Executors.defaultThreadFactory(),
+                new ThreadPoolExecutor.AbortPolicy());
+        BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>();
+        for (int page = 0; page < totalThreads; page++) {
+            final int currentPage = page;
+            threadPoolExecutor.execute(() -> {
+                try {
+                    List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200);
+                    dataList.addAll(pageData);
+                } catch (Exception e) {
+                    log.error("绗瑊}椤垫暟鎹煡璇㈠け璐�", currentPage, e);
+                } finally {
+                    latch.countDown(); // 绾跨▼鎵ц瀹屾垚 -1
+                }
+            });
+        }
+        try {
+            latch.await(); // 绛夊緟鎵�鏈夌嚎绋嬫墽琛屽畬鎴�
+            // 3. 娣诲姞es鏁版嵁
+//            BulkRequest bulkRequest = new BulkRequest();
+//            String finalIndexName = indexName;
+//            dataList.forEach(data -> {
+//                IndexRequest indexRequest = new IndexRequest(finalIndexName)
+//                        .id(data.getId())
+//                        .source(data);
+//                bulkRequest.add(indexRequest);
+//            });
+//            client.bulk(bulkRequest, RequestOptions.DEFAULT);
+            esVideoIndexRepository.saveAll(dataList);
+        } catch (InterruptedException e) {
+            log.error("澶氱嚎绋嬭鍙栬棰戞暟鎹紓甯�", e);
+        } finally {
+            threadPoolExecutor.shutdown();
+        }
+    }
+
+    @Override
+    public void addOrUpdateDocument(Object data) {
+        VideoIndex videoIndex = (VideoIndex) data;
+        esVideoIndexRepository.save(videoIndex);
+//        indexName = this.getIndexFullName(indexName);
+//        IndexRequest request = new IndexRequest(indexName);
+//        request.id(id).source(data);
+//        try {
+//            client.index(request, RequestOptions.DEFAULT);
+//        } catch (IOException e) {
+//            throw new RuntimeException("es鏂囨。娣诲姞/淇敼澶辫触", e);
+//        }
+    }
+
+    @Override
+    public void updateSomeField(String indexName, String id, Map<String, Object> updateList) {
+        indexName = this.getIndexFullName(indexName);
+        // 鏋勫缓鏇存柊璇锋眰
+        UpdateRequest request = new UpdateRequest(indexName, id);
+
+        try {
+            // 鏋勫缓鏇存柊鍐呭
+            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+            for (Map.Entry<String, Object> entry : updateList.entrySet()) {
+                builder.field(entry.getKey(), entry.getValue());
+            }
+            builder.endObject();
+
+            request.doc(builder); // 璁剧疆閮ㄥ垎鏇存柊鍐呭
+
+            // 鍙�夐厤缃�
+            request.retryOnConflict(2); // 鍐茬獊閲嶈瘯娆℃暟
+//            request.fetchSource(true);   // 杩斿洖鏇存柊鍚庣殑鏂囨。
+
+            client.update(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void deleteDocument(String indexName, String id) {
+        indexName = this.getIndexFullName(indexName);
+        DeleteRequest request = new DeleteRequest(indexName, id);
+        try {
+            client.delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            throw new RuntimeException("鍒犻櫎es鏂囨。澶辫触锛�" + id, e);
+        }
+    }
+
+    @Override
+    public boolean indexExist(String indexName) {
+        if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
+            indexName = this.getIndexFullName(indexName);
+        }
+        return super.indexExist(indexName);
+    }
+}

--
Gitblit v1.8.0