From aac2321d1cf5536f7ea03f30d55a4aba30fbf710 Mon Sep 17 00:00:00 2001 From: peng <peng.com> Date: 星期三, 02 七月 2025 10:39:39 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/dev' into dev --- framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 214 insertions(+), 0 deletions(-) diff --git a/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java new file mode 100644 index 0000000..3823ce3 --- /dev/null +++ b/framework/src/main/java/cn/lili/modules/lmk/service/impl/VideoEsServiceImpl.java @@ -0,0 +1,214 @@ +package cn.lili.modules.lmk.service.impl; + +import cn.lili.elasticsearch.BaseElasticsearchService; +import cn.lili.elasticsearch.EsSuffix; +import cn.lili.elasticsearch.config.ElasticsearchProperties; +import cn.lili.modules.lmk.domain.entity.Video; +import cn.lili.modules.lmk.domain.es.VideoIndex; +import cn.lili.modules.lmk.enums.general.VideoStatusEnum; +import cn.lili.modules.lmk.mapper.VideoMapper; +import cn.lili.modules.lmk.service.EsService; +import cn.lili.modules.search.repository.EsVideoIndexRepository; +import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.springframework.data.elasticsearch.core.document.Document; +import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.stereotype.Service; +import org.springframework.util.FileCopyUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +/** + * 瑙嗛es + * + * @author锛歺p + * @date锛�2025/6/30 15:54 + */ +@Slf4j +@RequiredArgsConstructor +@Service("videoEsServiceImpl") +public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService { + + private final ElasticsearchProperties elasticsearchProperties; + private final VideoMapper videoMapper; + private final EsVideoIndexRepository esVideoIndexRepository; + + @Override + public String getIndexFullName(String indexName) { + return elasticsearchProperties.getIndexPrefix() + "_" + indexName; + } + + @Override + public void createIndex(String indexName, String mappingJsonPath) { + if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { + indexName = this.getIndexFullName(indexName); + } + if (this.indexExist(indexName)) { + throw new RuntimeException(String.format("绱㈠紩锛�%s宸茬粡瀛樺湪锛屾棤娉曞垱寤�", indexName)); + } + CreateIndexRequest request = new CreateIndexRequest(indexName); + + // 1. 閰嶇疆绱㈠紩 + request.settings(Settings.builder() + .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards()) + .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas()) + .put("index.max_result_window", 100000) //鏈�澶ф煡璇㈢粨鏋滄暟 + .put("index.mapping.total_fields.limit", 2000)); + // 2. 閰嶇疆mapping + String mapping; + try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) { + byte[] bytes = FileCopyUtils.copyToByteArray(inputStream); + mapping = new String(bytes, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(String.format("璇诲彇es鏄犲皠json鏂囦欢锛�%s寮傚父", mappingJsonPath), e); + } + request.mapping(mapping, XContentType.JSON); + // 3. 鍒涘缓绱㈠紩 + try { + CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS); + } catch (IOException e) { + throw new RuntimeException(String.format("es鍒涘缓绱㈠紩澶辫触锛�%s", indexName), e); + } + } + + @Override + public void recreateIndex(String indexName, String mappingJsonPath) { + indexName = this.getIndexFullName(indexName); + // 1. 濡傛灉绱㈠紩瀛樺湪锛屽厛鍒犻櫎绱㈠紩锛屽啀鍒涘缓绱㈠紩 + if (this.indexExist(indexName)) { + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + try { + AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); + this.createIndex(indexName, mappingJsonPath); + } catch (IOException e) { + log.error("鍒犻櫎绱㈠紩澶辫触", e); + throw new RuntimeException("鍒犻櫎绱㈠紩澶辫触"); + } + } else { + this.createIndex(indexName, mappingJsonPath); + } + // 2. 澶氱嚎绋嬫煡璇㈣棰戞暟鎹紝鏋勫缓鏂囨。瀵硅薄 + Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper) + .count(); + int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 璁$畻闇�瑕佸灏戜釜绾跨▼ + CountDownLatch latch = new CountDownLatch(totalThreads); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + 4, + 10, + 10, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(4), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>(); + for (int page = 0; page < totalThreads; page++) { + final int currentPage = page; + threadPoolExecutor.execute(() -> { + try { + List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200); + dataList.addAll(pageData); + } catch (Exception e) { + log.error("绗瑊}椤垫暟鎹煡璇㈠け璐�", currentPage, e); + } finally { + latch.countDown(); // 绾跨▼鎵ц瀹屾垚 -1 + } + }); + } + try { + latch.await(); // 绛夊緟鎵�鏈夌嚎绋嬫墽琛屽畬鎴� + // 3. 娣诲姞es鏁版嵁 +// BulkRequest bulkRequest = new BulkRequest(); +// String finalIndexName = indexName; +// dataList.forEach(data -> { +// IndexRequest indexRequest = new IndexRequest(finalIndexName) +// .id(data.getId()) +// .source(data); +// bulkRequest.add(indexRequest); +// }); +// client.bulk(bulkRequest, RequestOptions.DEFAULT); + esVideoIndexRepository.saveAll(dataList); + } catch (InterruptedException e) { + log.error("澶氱嚎绋嬭鍙栬棰戞暟鎹紓甯�", e); + } finally { + threadPoolExecutor.shutdown(); + } + } + + @Override + public void addOrUpdateDocument(Object data) { + VideoIndex videoIndex = (VideoIndex) data; + esVideoIndexRepository.save(videoIndex); +// indexName = this.getIndexFullName(indexName); +// IndexRequest request = new IndexRequest(indexName); +// request.id(id).source(data); +// try { +// client.index(request, RequestOptions.DEFAULT); +// } catch (IOException e) { +// throw new RuntimeException("es鏂囨。娣诲姞/淇敼澶辫触", e); +// } + } + + @Override + public void updateSomeField(String indexName, String id, Map<String, Object> updateList) { + indexName = this.getIndexFullName(indexName); + // 鏋勫缓鏇存柊璇锋眰 + UpdateRequest request = new UpdateRequest(indexName, id); + + try { + // 鏋勫缓鏇存柊鍐呭 + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (Map.Entry<String, Object> entry : updateList.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + + request.doc(builder); // 璁剧疆閮ㄥ垎鏇存柊鍐呭 + + // 鍙�夐厤缃� + request.retryOnConflict(2); // 鍐茬獊閲嶈瘯娆℃暟 +// request.fetchSource(true); // 杩斿洖鏇存柊鍚庣殑鏂囨。 + + client.update(request, RequestOptions.DEFAULT); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void deleteDocument(String indexName, String id) { + indexName = this.getIndexFullName(indexName); + DeleteRequest request = new DeleteRequest(indexName, id); + try { + client.delete(request, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException("鍒犻櫎es鏂囨。澶辫触锛�" + id, e); + } + } + + @Override + public boolean indexExist(String indexName) { + if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) { + indexName = this.getIndexFullName(indexName); + } + return super.indexExist(indexName); + } +} -- Gitblit v1.8.0