欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java High Level REST Client 使用教程

程序员文章站 2023-01-28 10:37:58
说明 之前写过一个用jest操作es的文章,不过感觉写的有点乱。而且jest操作es的时候其实很多东西还是使用的es的一些东西。感觉还不如直接用官方的java客户端了。 所以就研究了下high-level-client,感觉挺好用的,而且官方文档很详细。推荐使用。 下面进入正题,写的东西大部分都是基 ......

说明

  之前写过一个用jest操作es的文章,不过感觉写的有点乱。而且jest操作es的时候其实很多东西还是使用的es的一些东西。感觉还不如直接用官方的java客户端了。

所以就研究了下high-level-client,感觉挺好用的,而且官方文档很详细。推荐使用。

  下面进入正题,写的东西大部分都是基于官方文档的。自己封装了个查询用的criteria类,方便查询。

elasticsearch官方文档:  

java high level rest client 官方文档  下面的实现大部分基于这个文档。

es客户端谷歌插件,方便没法*的同学。 elasticsearch-head_v0.1.4.crx 

maven引用

        <dependency>
            <groupid>org.elasticsearch.client</groupid>
            <artifactid>elasticsearch-rest-high-level-client</artifactid>
            <version>7.6.2</version>
        </dependency>
      <!-- 以下非必须引用,是一些辅助工具包 -->
        <dependency>
            <groupid>cn.hutool</groupid>
            <artifactid>hutool-all</artifactid>
            <version>5.3.2</version>
        </dependency>
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupid>junit</groupid>
            <artifactid>junit</artifactid>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>

操作

1、主要有 模版、index的一些操作。

2、数据操作方面有 插入、批量插入、删除、更新、根据条件更新。

3、数据查询 按数据库查询来说有 = (termquery) 、<= >= (rangequery)、in (termsquery)、not in (mustnot+termsquery) 都封装到了criteria类里。

import cn.hutool.core.collection.collutil;
import cn.hutool.core.date.datepattern;
import cn.hutool.core.date.dateutil;
import cn.hutool.core.lang.console;
import cn.hutool.core.util.randomutil;
import cn.hutool.json.jsonconfig;
import cn.hutool.json.jsonobject;
import cn.hutool.json.jsonutil;
import lombok.data;
import org.apache.http.httphost;
import org.elasticsearch.action.admin.indices.alias.alias;
import org.elasticsearch.action.admin.indices.delete.deleteindexrequest;
import org.elasticsearch.action.bulk.bulkrequest;
import org.elasticsearch.action.bulk.bulkresponse;
import org.elasticsearch.action.delete.deleterequest;
import org.elasticsearch.action.delete.deleteresponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.indexrequest;
import org.elasticsearch.action.index.indexresponse;
import org.elasticsearch.action.search.searchrequest;
import org.elasticsearch.action.search.searchresponse;
import org.elasticsearch.action.support.master.acknowledgedresponse;
import org.elasticsearch.client.requestoptions;
import org.elasticsearch.client.restclient;
import org.elasticsearch.client.resthighlevelclient;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.strings;
import org.elasticsearch.common.settings.settings;
import org.elasticsearch.common.xcontent.xcontentbuilder;
import org.elasticsearch.common.xcontent.xcontentfactory;
import org.elasticsearch.common.xcontent.xcontenttype;
import org.elasticsearch.index.query.querybuilders;
import org.elasticsearch.index.reindex.bulkbyscrollresponse;
import org.elasticsearch.index.reindex.updatebyqueryrequest;
import org.elasticsearch.script.script;
import org.elasticsearch.script.scripttype;
import org.elasticsearch.search.searchhit;
import org.elasticsearch.search.searchhits;
import org.elasticsearch.search.builder.searchsourcebuilder;
import org.elasticsearch.search.fetch.subphase.fetchsourcecontext;
import org.elasticsearch.search.sort.sortorder;
import org.junit.after;
import org.junit.before;
import org.junit.test;

import java.io.ioexception;
import java.util.collections;
import java.util.date;
import java.util.list;

/**
 * @author: sun
 * @create: 2020-04-27
 **/
public class main {
    resthighlevelclient client = null;

    @before
    public void before() {
        client = new resthighlevelclient(
                restclient.builder(
                        new httphost("127.0.0.1", 9200, "http")));
    }

    @test
    public void puttemplate() throws ioexception {
        putindextemplaterequest request = new putindextemplaterequest("test_template");
        //别名,所有根据该模版创建的index 都会添加这个别名。查询时可查询别名,就可以同时查询多个名称不同的index,根据此方法可实现index每天或每月生成等逻辑。
        request.alias(new alias("test_index"));
        request.order(10);
        //匹配哪些index。在创建index时会生效。
        request.patterns(collutil.newarraylist("test_index*"));
        request.settings(settings.builder()
                //数据插入后多久能查到,实时性要求高可以调低
                .put("index.refresh_interval", "10s")
                //传输日志,对数据安全性要求高的 设置 request,默认值:request
                .put("index.translog.durability", "async")
                .put("index.translog.sync_interval", "120s")
                //分片数量
                .put("index.number_of_shards", "5")
                //副本数量
                .put("index.number_of_replicas", "0")
                //单次最大查询数据的数量。默认10000。不要设置太高,如果有导出需求可以根据查询条件分批次查询。
                .put("index.max_result_window", "100000"));
        //使用官方提供的工具构建json。可以直接拼接一个json字符串,也可以使用map嵌套。
        xcontentbuilder jsonmapping = xcontentfactory.jsonbuilder();
        //所有数据类型 看官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.4/mapping-types.html#_core_datatypes
        jsonmapping.startobject().startobject("properties")
                .startobject("testid").field("type", "long").endobject()
                .startobject("price").field("type", "double").endobject()
                //keyword类型不会分词存储
                .startobject("name").field("type", "keyword").endobject()
                //指定分词器
                .startobject("content").field("type", "text").field("analyzer", "ik_max_word").endobject()
                .startobject("createtime").field("type", "date").field("format", "yyyy-mm-dd hh:mm:ss").endobject()
                .endobject().endobject();
        request.mapping(jsonmapping);
        //设置为true只强制创建,而不是更新索引模板。如果它已经存在,它将失败
        request.create(false);
        acknowledgedresponse response = client.indices().puttemplate(request, requestoptions.default);
        if (response.isacknowledged()) {
            console.log("创建模版成功!");
        } else {
            console.log("创建模版失败!");
        }
    }

    @test
    public void gettemplate() throws ioexception {
        getindextemplatesrequest getindextemplatesrequest = new getindextemplatesrequest("test*");
        getindextemplatesresponse getindextemplatesresponse = client.indices().getindextemplate(getindextemplatesrequest, requestoptions.default);
        list<indextemplatemetadata> indextemplates = getindextemplatesresponse.getindextemplates();
        indextemplates.foreach(t -> {
            console.log(t.name());
        });
    }

    @test
    public void createindex() throws ioexception {
        createindexrequest request = new createindexrequest("test_index_tmp");
        //这里也可以针对index单独设置。不过因为我们已经设置过模版了,所以这里就不进行设置了。
        //index其实也不用单独创建,在插入数据时,如果所有不存在,会自动创建索引。
        //request.settings();
        //request.mapping();
        //request.alias()
        createindexresponse createindexresponse = client.indices().create(request, requestoptions.default);
        if (createindexresponse.isacknowledged()) {
            console.log("创建index成功!");
        } else {
            console.log("创建index失败!");
        }
    }

    @test
    public void getindex() throws ioexception {
        getindexrequest request = new getindexrequest("test_index*");
        getindexresponse response = client.indices().get(request, requestoptions.default);
        string[] indices = response.getindices();
        for (string indexname : indices) {
            console.log("index name:{}", indexname);
        }
    }

    @test
    public void delindex() throws ioexception {
        deleteindexrequest request = new deleteindexrequest("test_index*");
//        deleteindexrequest request = new deleteindexrequest("test_index_tmp");
        acknowledgedresponse response = client.indices().delete(request, requestoptions.default);
        if (response.isacknowledged()) {
            console.log("删除index成功!");
        } else {
            console.log("删除index失败!");
        }
    }

    @test
    public void insertdata() throws ioexception {
        //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyymm。
        indexrequest request = new indexrequest("test_index_" + dateutil.format(new date(), "yyyymm"));
        //最好不要自定义id 会影响插入速度。
//        request.id("id");
        testdata testdata = new testdata();
        testdata.settestid(randomutil.randomlong(9999999999l));
        testdata.setprice(10.0d);
        testdata.setname(randomutil.randomstring(8));
        testdata.setcontent("");
        testdata.setcreatetime(new date());
        request.source(new jsonobject(testdata, new jsonconfig().setdateformat(datepattern.norm_datetime_pattern)).tostring()
                , xcontenttype.json);
        indexresponse response = client.index(request, requestoptions.default);
        console.log(response);
    }
    @test
    public void getbyid() throws ioexception {
        //注意 这里查询使用的是别名。
        getrequest request = new getrequest("test_index", "c-54v3eb_nn045d7vgjz");
        string[] includes = {"name","price"};
        string[] excludes = strings.empty_array;
        fetchsourcecontext fetchsourcecontext = new fetchsourcecontext(true,includes,excludes);
        //只查询特定字段。如果需要查询所有字段则不设置该项。
        request.fetchsourcecontext(fetchsourcecontext);
        getresponse response = client.get(request, requestoptions.default);
        console.log(response);
        console.log(jsonutil.tobean(response.getsourceasstring(),testdata.class));
    }
    
    @test
    public void delbyid() throws ioexception {
        deleterequest request = new deleterequest("test_index", "7e5gv3eb_nn045d7pgda");
        deleteresponse response = client.delete(request, requestoptions.default);
        console.log(response);
    }
    @test
    public void multigetbyid() throws ioexception {
        //多个根据id查询
        multigetrequest request = new multigetrequest();
        request.add("test_index","d-56v3eb_nn045d7vmjh");
        //两种写法
        request.add(new multigetrequest.item(
                "test_index",
                "mo57v3eb_nn045d7aggu"));
        multigetresponse response = client.mget(request, requestoptions.default);
        for (multigetitemresponse itemresponse : response) {
            console.log(itemresponse.getresponse().getsourceasstring());
        }
    }
    
    @test
    public void batchinsertdata() throws ioexception {
        //批量插入数据,更新和删除同理
        bulkrequest request = new bulkrequest("test_index_" + dateutil.format(new date(), "yyyymm"));
        for (int i = 0; i < 1000; i++) {
            testdata testdata = new testdata();
            testdata.settestid(randomutil.randomlong(9999999999l));
            testdata.setprice(100.0d);
            testdata.setname(randomutil.randomstring(8));
            testdata.setcontent("");
            testdata.setcreatetime(new date());
            request.add(new indexrequest().source(new jsonobject(testdata, new jsonconfig().setdateformat(datepattern.norm_datetime_pattern)).tostring()
                    , xcontenttype.json));
        }
        bulkresponse response = client.bulk(request, requestoptions.default);
        console.log("插入状态:{} 数量:{} ",response.status(),response.getitems().length);
    }
    @test
    public void updatebyquery() throws ioexception {
        updatebyqueryrequest request = new updatebyqueryrequest("test_index");
        //默认情况下,版本冲突会中止 updatebyqueryrequest 进程,但是你可以用以下命令来代替
        //设置版本冲突继续
        request.setconflicts("proceed");
        //设置更新条件
        request.setquery(querybuilders.rangequery("createtime").gte("2020-04-28 11:30:24").lte("2020-04-28 15:30:24"));
        //限制更新条数
        request.setmaxdocs(10);
        request.setscript(new script(scripttype.inline,"painless","ctx._source.testid=999999;", collections.emptymap()));
        bulkbyscrollresponse response = client.updatebyquery(request, requestoptions.default);
        console.log(response);
    }
    @test
    public void query() throws ioexception {
        searchrequest request = new searchrequest("test_index");
        searchsourcebuilder builder = criteria.create().addrangequery("createtime", "2020-04-28 11:30:24", "2020-04-28 15:30:24").builder();
        builder.from(0);
        builder.size(11);
        builder.sort("createtime", sortorder.asc);
        //不返回源数据。只有条数之类的数据。
//        builder.fetchsource(false);
        request.source(builder);
        searchresponse response = client.search(request, requestoptions.default);
        searchhits hits = response.gethits();
        for (searchhit hit : hits) {
            console.log(hit.getsourceasstring());
        }
        console.log("总数:{}",response.gethits().gettotalhits().value);
    }

    @after
    public void after() throws ioexception {
        client.close();
    }
}

@data
class testdata {
    private long testid;
    private double price;
    private string name;
    private string content;
    private date createtime;
}

criteria类

import cn.hutool.core.util.strutil;
import org.elasticsearch.index.query.boolquerybuilder;
import org.elasticsearch.index.query.querybuilders;
import org.elasticsearch.search.builder.searchsourcebuilder;

import java.util.list;

/**
 * 复合查询封装
 */
public class criteria {
    private final boolquerybuilder boolquerybuilder = querybuilders.boolquery();
    
    public static criteria create(){
        return new criteria();
    }
    
    /**
     * 条件增加完成后,获取需要操作的对象
     * 
     * @return
     */
    public searchsourcebuilder builder() {
        searchsourcebuilder searchsourcebuilder = new searchsourcebuilder();
        return searchsourcebuilder.query(boolquerybuilder);
    }

    /**
     * 增加条件查询
     * 
     * @param fieldname
     * @param fieldvalue
     */
    public criteria addtermquery(string fieldname, string fieldvalue) {
        if (strutil.isnotblank(fieldname) && strutil.isnotblank(fieldvalue)) {
            boolquerybuilder.filter(querybuilders.termquery(fieldname, fieldvalue));
        }
        return this;
    }
    /**
     * 增加条件查询
     * 主要针对 内容分词后 精确匹配 fieldvalue 不分词
     * @param fieldname
     * @param fieldvalue
     */
    public criteria addmatchphrasequery(string fieldname, string fieldvalue) {
        if (strutil.isnotblank(fieldname) && strutil.isnotblank(fieldvalue)) {
            boolquerybuilder.filter(querybuilders.matchphrasequery(fieldname, fieldvalue));
        }
        return this;
    }

    /**
     * 增加区间查询
     * 
     * @param fieldname
     * @param gtevalue
     * @param ltevalue
     */
    public criteria addrangequery(string fieldname, object gtevalue, object ltevalue) {
        if (strutil.isnotblank(fieldname)) {
            boolquerybuilder.filter(querybuilders.rangequery(fieldname).gte(gtevalue).lte(ltevalue).includelower(true)
                    .includeupper(true));
        }
        return this;
    }

    /**
     * 增加包含查询,相当于sql中的in
     * 
     * @param fieldname
     * @param values
     */
    public criteria addtermsquery(string fieldname, list<?> values) {
        if (strutil.isnotblank(fieldname) && values != null && values.size() > 0) {
            boolquerybuilder.filter(querybuilders.termsquery(fieldname, values));
        }
        return this;
    }
    
    /**
     * 增加不包含查询,相当于sql中的 not in
     * 
     * @param fieldname
     * @param values
     */
    public criteria addnottermsquery(string fieldname, list<?> values) {
        if (strutil.isnotblank(fieldname) && values != null && values.size() > 0) {
            boolquerybuilder.mustnot(querybuilders.termsquery(fieldname, values));
        }
        return this;
    }
    
}