欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ElasticSearch相关配置及使用(详细说明)

程序员文章站 2022-07-09 19:15:10
...

ElasticSearch相关配置及使用(详细说明)

1、相关配置(仅供参考)

@Configuration
public class ElasticSearchConfig { 
    @Autowired
    private Environment env;
    
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        // 拆分ip地址
        List<HttpHost> hostLists = new ArrayList<>();
        String[] hostList = address.split(",");
        for (String addr : hostList) {
            String host = addr.split(":")[0];
            String port = addr.split(":")[1];
            //参数1:IP,参数2:端口号,参数三:协议
            hostLists.add(new HttpHost(host, Integer.parseInt(port), schema));
        }
        // 转换成 HttpHost 数组
        HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{});
        // 构建连接对象
        RestClientBuilder builder = RestClient.builder(httpHost);
        
        // 异步连接延时配置
        //配置请求超时超时,分为 连接超时(默认1s) 和 套接字超时(默认30s)
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(connectTimeout);//配置连接超时时间
            requestConfigBuilder.setSocketTimeout(socketTimeout);//配置套接字超时时间
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);//获取连接的超时时间
            return requestConfigBuilder;
        });
        
        // 异步连接数配置
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(maxConnectNum);//配置最大连接数量
            httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);//配置最大的路由连接数
            return httpClientBuilder;
        });
        RestHighLevelClient client = new RestHighLevelClient(builder);
        
        //得到索引名数组
        String[] indexNames = getIndexNames();
        List<String> no_created_indexs = indexExists(client, indexNames);//得到索引没有创建的集合
        if(no_created_indexs.size()!=0) {
        	//创建index
        	for(String indexName:no_created_indexs) {
        		createIndex(new RestHighLevelClient(builder),indexName);//创建该索引
        	}
        }
        
        return new RestHighLevelClient(builder);
    }
    
    /**
     * 创建index,因为CreateIndexRequest属于ddl操作,需要在传入时传一个new的client,在执行完会close client
     * @param client
     * @param indexName
     * @return
     */
    private boolean createIndex(RestHighLevelClient client,String indexName) {
    	CreateIndexRequest request = new CreateIndexRequest(indexName);
    	request.settings(Settings.builder()
    	        .put("index.number_of_shards",3)   //分片数 //TODO 最好能配置化
    	        .put("index.number_of_replicas", 1));//副本数 //TODO 最好能配置化
    	request.alias(new Alias(indexName+"alias"));//设置别名
//    	request.setTimeout(TimeValue.timeValueMinutes(2));//设置创建索引超时2分钟
    	// 同步请求
    	try {
    	    CreateIndexResponse createIndexResponse = client.indices().create(request,RequestOptions.DEFAULT);
    	    // 处理响应
    	    boolean acknowledged = createIndexResponse.isAcknowledged();
    	    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
    	    System.out.println(acknowledged+","+shardsAcknowledged);
    	   client.close();
    	   return true;
    	} catch (IOException e) {
    	    return false;
    	}
	}

	private String[] getIndexNames() {
		String[] ins = env.getProperty("elasticsearchinfo.index_list").split(",");
		return ins;
	}

	private Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class);
	/**
	 * 判断索引是否存在
	 * @param client
	 * @param indexNames
	 * @return 未创建的索引list
	 */
	public List<String> indexExists(RestHighLevelClient client,String[] indexNames) {
        GetIndexRequest request = new GetIndexRequest();
        List<String> no_created_indexs = new ArrayList<String>();
        for(String indexName:indexNames) {
        	request.indices(indexName);//设置索引
            try {
                if(client.indices().exists(request, RequestOptions.DEFAULT)) {
                	
                }else {//如果不存在,添加到list中
                	no_created_indexs.add(indexName);
                }
            } catch(ConnectException e) {
            	log.error("es服务连接失败或未启动");
            } catch (IOException e) {
                e.printStackTrace();
                no_created_indexs.add(indexName);
            } 
        }
        return no_created_indexs;
    }
}

2、CURD

说明:在进行CURD之前上面的配置必不可少

(1)增加

public ResponseData doInsertPomenzhi(PoMenzhi po) throws IOException {
    //构建新增请求体
    if (po.getId() == null || "".equals(po.getId())) {
        return ResponseDataUtil.buildError("es索引id缺失");
    }
    //1、添加新文档需要调用IndexRequest请求
    IndexRequest indexRequest = new IndexRequest(index_shanghai, type_shanghai, po.getId().toString());
    //source方法;将文档源设置为索引
    indexRequest.source(JSON.toJSONString(po), XContentType.JSON);
    //发送请求
    //同步执行 当以下列方式执行IndexRequest时,客户端在继续执行代码之前,会等待返回索引响应:
    IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
    /**indexResponse 示例:
         * {
         *     "_shards" : {
         *         "total" : 2,
         *         "failed" : 0,
         *         "successful" : 1
         *     },
         *     "_index" : "twitter",
         *     "_type" : "_doc",
         *     "_id" : "1",
         *     "_version" : 1,
         *     "_seq_no" : 0,
         *     "_primary_term" : 1,
         *     "result" : "created"
         * }
         */
    //获取结果,进行比较
    if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
        return ResponseDataUtil.buildSuccess("创建成功");
    } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        return ResponseDataUtil.buildSuccess("更新成功");
    }
    //获取分片信息
    ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        return ResponseDataUtil.buildSuccess("集群部分创建成功");
    }
    //创建失败
    if (shardInfo.getFailed() > 0) {
        ArrayList<Object> reason = new ArrayList<>();
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
            reason.add(failure.reason());
        }
        return ResponseDataUtil.buildError("500", "创建失败", reason);
    }
    return ResponseDataUtil.buildError("500", "创建失败", "原因未知");
}

(2)更新

public ResponseData doUpdate(PoMenzhi po) throws IOException {
    if (po.getId() == null || "".equals(po.getId())) {
        return ResponseDataUtil.buildError("es索引id缺失");
    }
    //构建更新请求体
    UpdateRequest request = new UpdateRequest(index_shanghai, type_shanghai, po.getId().toString());
    request.doc(JSON.toJSONString(po), XContentType.JSON);
    //发送请求
    UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
    //处理请求结果
    if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
        return ResponseDataUtil.buildSuccess("创建成功");
    } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        return ResponseDataUtil.buildSuccess("更新成功");
    } else {
        return ResponseDataUtil.buildError("更新失败");
    }
}

(3)删除

public ResponseData doDelete(Long id) throws IOException {
    DeleteRequest request = new DeleteRequest(index_shanghai, type_shanghai, id.toString());
    DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
    //删除响应体
    ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        return ResponseDataUtil.buildSuccess("删除成功");
    }
    if (shardInfo.getFailed() > 0) {
        ArrayList<Object> reason = new ArrayList<>();
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
            reason.add(failure.reason());
        }
        return ResponseDataUtil.buildError("500", "删除失败", reason);
    } else {
        return ResponseDataUtil.buildError("500", "删除失败", "原因未知");
    }
}

(4)查询

  1. 创建搜索请求对象SearchRequest,设置查询的指定索引和类型
  2. 创建搜索内容对象SearchSourceBuilder
  3. 创建查询对象MatchQueryBuilder,以及MatchQueryBuilder对象的配置
  4. 将查询对象MatchQueryBuilder添加到搜索内容对象SearchSourceBuilder中,以及SearchSourceBuilder对象的配置
  5. 将搜索内容对象SearchSourceBuilder添加到搜索请求对象SearchRequest中
public ResponseData getByHitwords(String hitword) throws IOException {
//1、创建搜索请求对象SearchRequest,设置查询的指定索引和类型
    SearchRequest searchRequest = new SearchRequest(index_shanghai);
    searchRequest.types(type_shanghai);

//2、创建搜索内容对象SearchSourceBuilder
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    
//3、创建查询对象MatchQueryBuilder,以及MatchQueryBuilder对象的配置
    MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("fulltext", hitword);
    //启动模糊查询
    matchQueryBuilder.fuzziness(Fuzziness.AUTO);
    //设置最大扩展选项以控制查询的模糊过程
    matchQueryBuilder.maxExpansions(10);
    
//4、将查询对象MatchQueryBuilder添加到搜索内容对象SearchSourceBuilder中,以及SearchSourceBuilder对象的配置
    searchSourceBuilder.query(matchQueryBuilder);
    //设置查询的起始索引位置
    searchSourceBuilder.from(0);
    //设置查询的数量
    searchSourceBuilder.size(5);
    //设置超时时间
    searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//5、将搜索内容对象SearchSourceBuilder添加到搜索请求对象SearchRequest中
    searchRequest.source(searchSourceBuilder);
    
    //发送搜索请求
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.getHits();
    //解析响应结果
    ArrayList list = new ArrayList<>();
    for (SearchHit hit : searchHits) {
        Map<String, Object> map = hit.getSourceAsMap();
        list.add(JSONObject.parseObject(JSON.toJSONString(map)).toJavaObject(PoMenzhiShanghai.class));
    }
    return ResponseDataUtil.buildSuccess(list);
}
相关标签: java elasticsearch