package cn.lili.elasticsearch; import cn.hutool.core.bean.BeanUtil; import cn.lili.elasticsearch.config.ElasticsearchProperties; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author paulG * @since 2020/10/14 **/ @Slf4j public abstract class BaseElasticsearchService { protected static final RequestOptions COMMON_OPTIONS; static { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); //默认缓冲限制为100MB,此处修改为30MB。 builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024)); COMMON_OPTIONS = builder.build(); } @Autowired @Qualifier("elasticsearchClient") protected RestHighLevelClient client; @Autowired private ElasticsearchProperties elasticsearchProperties; /** * build DeleteIndexRequest * * @param index elasticsearch index name * @author fxbin */ private static DeleteIndexRequest buildDeleteIndexRequest(String index) { return new DeleteIndexRequest(index); } /** * build IndexRequest * * @param index elasticsearch index name * @param id request object id * @param object request object * @return {@link IndexRequest} * @author fxbin */ protected static IndexRequest buildIndexRequest(String index, String id, Object object) { return new IndexRequest(index).id(id).source(BeanUtil.beanToMap(object), XContentType.JSON); } /** * create elasticsearch index (asyc) * * @param index elasticsearch index * @author fxbin */ protected void createIndexRequest(String index) { try { CreateIndexRequest request = new CreateIndexRequest(index); //Settings for this index request.settings(Settings.builder() .put("index.number_of_shards", elasticsearchProperties.getIndex().getNumberOfShards()) .put("index.number_of_replicas", elasticsearchProperties.getIndex().getNumberOfReplicas()) .put("index.max_result_window", 100000) //最大查询结果数 .put("index.mapping.total_fields.limit", 2000)); //创建索引 CreateIndexResponse createIndexResponse = client.indices().create(request, COMMON_OPTIONS); createMapping(index); log.info(" whether all of the nodes have acknowledged the request : {}", createIndexResponse.isAcknowledged()); log.info(" Indicates whether the requisite number of shard copies were started for each shard in the index before timing out :{}", createIndexResponse.isShardsAcknowledged()); } catch (Exception e) { log.error("创建索引错误", e); throw new ElasticsearchException("创建索引 {" + index + "} 失败:" + e.getMessage()); } } public void createMapping(String index) throws Exception { String source = " {\n" + " \"properties\": {\n" + " \"_class\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"attrList\": {\n" + " \"type\": \"nested\",\n" + " \"properties\": {\n" + " \"name\": {\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"type\": {\n" + " \"type\": \"integer\"\n" + " },\n" + " \"value\": {\n" + " \"type\": \"keyword\"\n" + " }\n" + " }\n" + " },\n" + " \"brandId\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"brandName\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"brandUrl\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"buyCount\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"releaseTime\": {\n" + " \"type\": \"date\",\n" + " \"format\": \"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis\"\n" + " },\n" + " \"categoryPath\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"categoryNamePath\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"commentNum\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"skuSource\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"goodsId\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"goodsName\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true, \n" + " \"analyzer\": \"ik_max_word\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"grade\": {\n" + " \"type\": \"float\"\n" + " },\n" + " \"highPraiseNum\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"id\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"intro\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"authFlag\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"marketEnable\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true, \n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"mobileIntro\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"point\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"price\": {\n" + " \"type\": \"float\"\n" + " },\n" + " \"salesModel\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"recommend\": {\n" + " \"type\": \"boolean\"\n" + " },\n" + " \"selfOperated\": {\n" + " \"type\": \"boolean\"\n" + " },\n" + " \"sellerId\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"sellerName\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true, \n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"shopCategoryPath\": {\n" + " \"type\": \"text\",\n" + " \"fielddata\": true, \n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"sn\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " },\n" + " \"promotionMapJson\": {\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"thumbnail\": {\n" + " \"type\": \"text\",\n" + " \"fields\": {\n" + " \"keyword\": {\n" + " \"type\": \"keyword\",\n" + " \"ignore_above\": 256\n" + " }\n" + " }\n" + " }\n" + " }\n" + " }\n"; PutMappingRequest request = new PutMappingRequest(index).source(source, XContentType.JSON); CountDownLatch latch = new CountDownLatch(1); client.indices().putMappingAsync( request, RequestOptions.DEFAULT, new ActionListener() { @Override public void onResponse(AcknowledgedResponse r) { latch.countDown(); log.info("创建索引mapping成功:{}", r); } @Override public void onFailure(Exception e) { latch.countDown(); log.error("创建索引mapping失败", e); } }); latch.await(10, TimeUnit.SECONDS); } /** * Description: 判断某个index是否存在 * * @param index index名 * @return boolean * @author fanxb * @since 2019/7/24 14:57 */ public boolean indexExist(String index) { try { GetIndexRequest request = new GetIndexRequest(index); request.local(false); request.humanReadable(true); request.includeDefaults(false); return client.indices().exists(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new ElasticsearchException("获取索引 {" + index + "} 是否存在失败:" + e.getMessage()); } } /** * delete elasticsearch index * * @param index elasticsearch index name * @author fxbin */ protected void deleteIndexRequest(String index) { DeleteIndexRequest deleteIndexRequest = buildDeleteIndexRequest(index); client.indices().deleteAsync(deleteIndexRequest, COMMON_OPTIONS, new ActionListener() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { log.info("删除索引 {} 成功", index); } @Override public void onFailure(Exception e) { log.error("删除索引 {} 失败", index, e); } }); } /** * exec updateRequest * * @param index elasticsearch index name * @param id Document id * @param object request object * @author fxbin */ protected void updateRequest(String index, String id, Object object) { try { UpdateRequest updateRequest = new UpdateRequest(index, id).doc(BeanUtil.beanToMap(object), XContentType.JSON); client.update(updateRequest, COMMON_OPTIONS); } catch (IOException e) { throw new ElasticsearchException("更新索引 {" + index + "} 数据 {" + object + "} 失败: " + e.getMessage()); } } /** * exec deleteRequest * * @param index elasticsearch index name * @param id Document id * @author fxbin */ protected void deleteRequest(String index, String id) { try { DeleteRequest deleteRequest = new DeleteRequest(index, id); client.delete(deleteRequest, COMMON_OPTIONS); } catch (IOException e) { throw new ElasticsearchException("删除索引 {" + index + "} 数据id {" + id + "} 失败: " + e.getMessage()); } } /** * search all * * @param index elasticsearch index name * @return {@link SearchResponse} * @author fxbin */ protected SearchResponse search(String index) { SearchRequest searchRequest = new SearchRequest(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = null; try { searchResponse = client.search(searchRequest, COMMON_OPTIONS); } catch (IOException e) { log.error("es 搜索错误", e); } return searchResponse; } }