package cn.lili.modules.lmk.service.impl;
|
|
import cn.lili.elasticsearch.BaseElasticsearchService;
|
import cn.lili.elasticsearch.EsSuffix;
|
import cn.lili.elasticsearch.config.ElasticsearchProperties;
|
import cn.lili.modules.lmk.domain.entity.Video;
|
import cn.lili.modules.lmk.domain.es.VideoIndex;
|
import cn.lili.modules.lmk.enums.general.VideoStatusEnum;
|
import cn.lili.modules.lmk.mapper.VideoMapper;
|
import cn.lili.modules.lmk.service.EsService;
|
import cn.lili.modules.search.repository.EsVideoIndexRepository;
|
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.indices.CreateIndexRequest;
|
import org.elasticsearch.client.indices.CreateIndexResponse;
|
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentType;
|
import org.springframework.data.elasticsearch.core.document.Document;
|
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.FileCopyUtils;
|
|
import java.io.IOException;
|
import java.io.InputStream;
|
import java.nio.charset.StandardCharsets;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
|
/**
|
* 视频es
|
*
|
* @author:xp
|
* @date:2025/6/30 15:54
|
*/
|
@Slf4j
|
@RequiredArgsConstructor
|
@Service("videoEsServiceImpl")
|
public class VideoEsServiceImpl extends BaseElasticsearchService implements EsService {
|
|
private final ElasticsearchProperties elasticsearchProperties;
|
private final VideoMapper videoMapper;
|
private final EsVideoIndexRepository esVideoIndexRepository;
|
|
@Override
|
public String getIndexFullName(String indexName) {
|
return elasticsearchProperties.getIndexPrefix() + "_" + indexName;
|
}
|
|
@Override
|
public void createIndex(String indexName, String mappingJsonPath) {
|
if (! indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
|
indexName = this.getIndexFullName(indexName);
|
}
|
if (this.indexExist(indexName)) {
|
throw new RuntimeException(String.format("索引:%s已经存在,无法创建", indexName));
|
}
|
CreateIndexRequest request = new CreateIndexRequest(indexName);
|
|
// 1. 配置索引
|
request.settings(Settings.builder()
|
.put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards())
|
.put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas())
|
.put("index.max_result_window", 100000) //最大查询结果数
|
.put("index.mapping.total_fields.limit", 2000));
|
// 2. 配置mapping
|
String mapping;
|
try (InputStream inputStream = this.getClass().getResourceAsStream(mappingJsonPath)) {
|
byte[] bytes = FileCopyUtils.copyToByteArray(inputStream);
|
mapping = new String(bytes, StandardCharsets.UTF_8);
|
} catch (IOException e) {
|
throw new RuntimeException(String.format("读取es映射json文件:%s异常", mappingJsonPath), e);
|
}
|
request.mapping(mapping, XContentType.JSON);
|
// 3. 创建索引
|
try {
|
CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS);
|
} catch (IOException e) {
|
throw new RuntimeException(String.format("es创建索引失败:%s", indexName), e);
|
}
|
}
|
|
@Override
|
public void recreateIndex(String indexName, String mappingJsonPath) {
|
indexName = this.getIndexFullName(indexName);
|
// 1. 如果索引存在,先删除索引,再创建索引
|
if (this.indexExist(indexName)) {
|
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
|
try {
|
AcknowledgedResponse deleteRes = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
|
this.createIndex(indexName, mappingJsonPath);
|
} catch (IOException e) {
|
log.error("删除索引失败", e);
|
throw new RuntimeException("删除索引失败");
|
}
|
} else {
|
this.createIndex(indexName, mappingJsonPath);
|
}
|
// 2. 多线程查询视频数据,构建文档对象
|
Long totalVideo = new LambdaQueryChainWrapper<>(videoMapper)
|
.count();
|
int totalThreads = (int) Math.ceil((double) totalVideo / 200); // 计算需要多少个线程
|
CountDownLatch latch = new CountDownLatch(totalThreads);
|
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
4,
|
10,
|
10,
|
TimeUnit.SECONDS,
|
new ArrayBlockingQueue<>(4),
|
Executors.defaultThreadFactory(),
|
new ThreadPoolExecutor.AbortPolicy());
|
BlockingQueue<VideoIndex> dataList = new LinkedBlockingQueue<>();
|
for (int page = 0; page < totalThreads; page++) {
|
final int currentPage = page;
|
threadPoolExecutor.execute(() -> {
|
try {
|
List<VideoIndex> pageData = videoMapper.getEsPage(currentPage * 200, 200);
|
dataList.addAll(pageData);
|
} catch (Exception e) {
|
log.error("第{}页数据查询失败", currentPage, e);
|
} finally {
|
latch.countDown(); // 线程执行完成 -1
|
}
|
});
|
}
|
try {
|
latch.await(); // 等待所有线程执行完成
|
// 3. 添加es数据
|
// BulkRequest bulkRequest = new BulkRequest();
|
// String finalIndexName = indexName;
|
// dataList.forEach(data -> {
|
// IndexRequest indexRequest = new IndexRequest(finalIndexName)
|
// .id(data.getId())
|
// .source(data);
|
// bulkRequest.add(indexRequest);
|
// });
|
// client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
esVideoIndexRepository.saveAll(dataList);
|
} catch (InterruptedException e) {
|
log.error("多线程读取视频数据异常", e);
|
} finally {
|
threadPoolExecutor.shutdown();
|
}
|
}
|
|
@Override
|
public void addOrUpdateDocument(Object data) {
|
VideoIndex videoIndex = (VideoIndex) data;
|
esVideoIndexRepository.save(videoIndex);
|
// indexName = this.getIndexFullName(indexName);
|
// IndexRequest request = new IndexRequest(indexName);
|
// request.id(id).source(data);
|
// try {
|
// client.index(request, RequestOptions.DEFAULT);
|
// } catch (IOException e) {
|
// throw new RuntimeException("es文档添加/修改失败", e);
|
// }
|
}
|
|
@Override
|
public void updateSomeField(String indexName, String id, Map<String, Object> updateList) {
|
indexName = this.getIndexFullName(indexName);
|
// 构建更新请求
|
UpdateRequest request = new UpdateRequest(indexName, id);
|
|
try {
|
// 构建更新内容
|
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
|
for (Map.Entry<String, Object> entry : updateList.entrySet()) {
|
builder.field(entry.getKey(), entry.getValue());
|
}
|
builder.endObject();
|
|
request.doc(builder); // 设置部分更新内容
|
|
// 可选配置
|
request.retryOnConflict(2); // 冲突重试次数
|
// request.fetchSource(true); // 返回更新后的文档
|
|
client.update(request, RequestOptions.DEFAULT);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
@Override
|
public void deleteDocument(String indexName, String id) {
|
indexName = this.getIndexFullName(indexName);
|
DeleteRequest request = new DeleteRequest(indexName, id);
|
try {
|
client.delete(request, RequestOptions.DEFAULT);
|
} catch (IOException e) {
|
throw new RuntimeException("删除es文档失败:" + id, e);
|
}
|
}
|
|
@Override
|
public boolean indexExist(String indexName) {
|
if (!indexName.startsWith(elasticsearchProperties.getIndexPrefix())) {
|
indexName = this.getIndexFullName(indexName);
|
}
|
return super.indexExist(indexName);
|
}
|
}
|