xiangpei
3 天以前 07a8e96750473fbdf676993191c41155ddf8146e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package cn.lili.modules.lmk.service.impl;
 
import cn.lili.elasticsearch.BaseElasticsearchService;
import cn.lili.elasticsearch.EsSuffix;
import cn.lili.elasticsearch.config.ElasticsearchProperties;
import cn.lili.modules.lmk.domain.entity.Video;
import cn.lili.modules.lmk.domain.es.VideoIndex;
import cn.lili.modules.lmk.enums.general.VideoStatusEnum;
import cn.lili.modules.lmk.mapper.VideoMapper;
import cn.lili.modules.lmk.service.EsService;
import cn.lili.modules.search.repository.EsVideoIndexRepository;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
 
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
 
/**
 * 视频es
 *
 * @author:xp
 * @date:2025/6/30 15:54
 */
@Slf4j
@RequiredArgsConstructor
@Service("videoEsServiceImpl")
public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService {
 
    private final ElasticsearchProperties elasticsearchProperties;
    private final VideoMapper videoMapper;
    private final EsVideoIndexRepository esVideoIndexRepository;
 
    @Override
    public String getIndexFullName(String indexName) {
        return elasticsearchProperties.getIndexPrefix() + "_" + indexName;
    }
 
    @Override
    public void createIndex(String indexName, String mappingJsonPath) {
        if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
            indexName = this.getIndexFullName(indexName);
        }
        if (this.indexExist(indexName)) {
            throw new RuntimeException(String.format("索引:%s已经存在,无法创建", indexName));
        }
        CreateIndexRequest request = new CreateIndexRequest(indexName);
 
        // 1. 配置索引
        request.settings(Settings.builder()
                .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards())
                .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas())
                .put("index.max_result_window", 100000) //最大查询结果数
                .put("index.mapping.total_fields.limit", 2000));
        // 2. 配置mapping
        String mapping;
        try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) {
            byte[] bytes = FileCopyUtils.copyToByteArray(inputStream);
            mapping = new String(bytes, StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new RuntimeException(String.format("读取es映射json文件:%s异常", mappingJsonPath), e);
        }
        request.mapping(mapping, XContentType.JSON);
        // 3. 创建索引
        try {
            CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS);
        } catch (IOException e) {
            throw new RuntimeException(String.format("es创建索引失败:%s", indexName), e);
        }
    }
 
    @Override
    public void recreateIndex(String indexName, String mappingJsonPath) {
        indexName = this.getIndexFullName(indexName);
        // 1. 如果索引存在,先删除索引,再创建索引
        if (this.indexExist(indexName)) {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            try {
                AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
                this.createIndex(indexName, mappingJsonPath);
            } catch (IOException e) {
                log.error("删除索引失败", e);
                throw new RuntimeException("删除索引失败");
            }
        } else {
            this.createIndex(indexName, mappingJsonPath);
        }
        // 2. 多线程查询视频数据,构建文档对象
        Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper)
                .count();
        int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程
        CountDownLatch latch = new CountDownLatch(totalThreads);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                4,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>();
        for (int page = 0; page < totalThreads; page++) {
            final int currentPage = page;
            threadPoolExecutor.execute(() -> {
                try {
                    List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200);
                    dataList.addAll(pageData);
                } catch (Exception e) {
                    log.error("第{}页数据查询失败", currentPage, e);
                } finally {
                    latch.countDown(); // 线程执行完成 -1
                }
            });
        }
        try {
            latch.await(); // 等待所有线程执行完成
            // 3. 添加es数据
//            BulkRequest bulkRequest = new BulkRequest();
//            String finalIndexName = indexName;
//            dataList.forEach(data -> {
//                IndexRequest indexRequest = new IndexRequest(finalIndexName)
//                        .id(data.getId())
//                        .source(data);
//                bulkRequest.add(indexRequest);
//            });
//            client.bulk(bulkRequest, RequestOptions.DEFAULT);
            esVideoIndexRepository.saveAll(dataList);
        } catch (InterruptedException e) {
            log.error("多线程读取视频数据异常", e);
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
 
    @Override
    public void addOrUpdateDocument(Object data) {
        VideoIndex videoIndex = (VideoIndex) data;
        esVideoIndexRepository.save(videoIndex);
//        indexName = this.getIndexFullName(indexName);
//        IndexRequest request = new IndexRequest(indexName);
//        request.id(id).source(data);
//        try {
//            client.index(request, RequestOptions.DEFAULT);
//        } catch (IOException e) {
//            throw new RuntimeException("es文档添加/修改失败", e);
//        }
    }
 
    @Override
    public void updateSomeField(String indexName, String id, Map<String, Object> updateList) {
        indexName = this.getIndexFullName(indexName);
        // 构建更新请求
        UpdateRequest request = new UpdateRequest(indexName, id);
 
        try {
            // 构建更新内容
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
            for (Map.Entry<String, Object> entry : updateList.entrySet()) {
                builder.field(entry.getKey(), entry.getValue());
            }
            builder.endObject();
 
            request.doc(builder); // 设置部分更新内容
 
            // 可选配置
            request.retryOnConflict(2); // 冲突重试次数
//            request.fetchSource(true);   // 返回更新后的文档
 
            client.update(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public void deleteDocument(String indexName, String id) {
        indexName = this.getIndexFullName(indexName);
        DeleteRequest request = new DeleteRequest(indexName, id);
        try {
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("删除es文档失败:" + id, e);
        }
    }
 
    @Override
    public boolean indexExist(String indexName) {
        if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
            indexName = this.getIndexFullName(indexName);
        }
        return super.indexExist(indexName);
    }
}