前言
前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。
概要
本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。
代码示例
引入依赖
我们以maven项目为例,添加项目依赖
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.3.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>6.3.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.12.1</version></dependency>
建立ES连接
创建Settings对象,指定集群名称 创建TransportClient对象,手动指定IP、端口即可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:
// 将client.transport.sniff设置为true即可打开集群节点自动探查功能Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();// 只需要指定一个node就行TransportClient client = new PreBuiltTransportClient(settings);transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
基本CRUD
最基本的CRUD代码,可以当作入门demo来写:
/*** 创建员工信息(创建一个document)* @param client*/private static void createEmployee(TransportClient client) throws Exception {IndexResponse response = client.prepareIndex("company", "employee", "1").setSource(XContentFactory.jsonBuilder().startObject().field("name", "jack").field("age", 27).field("position", "technique").field("country", "china").field("join_date", "2017-01-01").field("salary", 10000).endObject()).get();System.out.println(response.getResult());}/*** 获取员工信息* @param client* @throws Exception*/private static void getEmployee(TransportClient client) throws Exception {GetResponse response = client.prepareGet("company", "employee", "1").get();System.out.println(response.getSourceAsString());}/*** 修改员工信息* @param client* @throws Exception*/private static void updateEmployee(TransportClient client) throws Exception {UpdateResponse response = client.prepareUpdate("company", "employee", "1").setDoc(XContentFactory.jsonBuilder().startObject().field("position", "technique manager").endObject()).get();System.out.println(response.getResult());}/*** 删除 员工信息* @param client* @throws Exception*/private static void deleteEmployee(TransportClient client) throws Exception {DeleteResponse response = client.prepareDelete("company", "employee", "1").get();System.out.println(response.getResult());}
搜索
我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:
GET /company/employee/_search{"query": {"bool": {"must": [{"match": {"position": "technique"}}],"filter": {"range": {"age": {"gte": 30,"lte": 40}}}}},"from": 0,"size": 1}
等同于这样的Java代码:
SearchResponse response = client.prepareSearch("company").setTypes("employee").setQuery(QueryBuilders.termQuery("position", "technique")) // Query.setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter.setFrom(0).setSize(60).get();
聚合查询
聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:
需求:
按照country国家来进行分组 在每个country分组内,再按照入职年限进行分组 最后计算每个分组内的平均薪资
Restful的请求如下:
GET /company/employee/_search{"size": 0,"aggs": {"group_by_country": {"terms": {"field": "country"},"aggs": {"group_by_join_date": {"date_histogram": {"field": "join_date","interval": "year"},"aggs": {"avg_salary": {"avg": {"field": "salary"}}}}}}}}
用Java编写的请求如下:
SearchResponse sr = node.client().prepareSearch().addAggregation(AggregationBuilders.terms("by_country").field("country").subAggregation(AggregationBuilders.dateHistogram("by_year").field("dateOfBirth").dateHistogramInterval(DateHistogramInterval.YEAR).subAggregation(AggregationBuilders.avg("avg_children").field("children")))).execute().actionGet();
对响应的处理,则需要一层一层获取数据:
Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();while(groupByCountryBucketIterator.hasNext()) {Bucket groupByCountryBucket = groupByCountryBucketIterator.next();System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();while(groupByJoinDateBucketIterator.hasNext()) {org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");System.out.println(avgSalary.getValue());}}client.close();}
upsert请求
private static void upsert(TransportClient transport) {try {IndexRequest index = new IndexRequest("book_shop", "books", "2").source(XContentFactory.jsonBuilder().startObject().field("name", "mysql从入门到删库跑路").field("tags", "mysql").field("price", 32.8).endObject());UpdateRequest update = new UpdateRequest("book_shop", "books", "2").doc(XContentFactory.jsonBuilder().startObject().field("price", 31.8).endObject()).upsert(index);UpdateResponse response = transport.update(update).get();System.out.println(response.getVersion());} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
mget请求
public static void mget(TransportClient transport) {MultiGetResponse res = transport.prepareMultiGet().add("book_shop", "books", "1").add("book_shop", "books", "2").get();for (MultiGetItemResponse item : res.getResponses()) {System.out.println(item.getResponse());}}
bulk请求
public static void bulk(TransportClient transport) {try {BulkRequestBuilder bulk = transport.prepareBulk();bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(XContentFactory.jsonBuilder().startObject().field("name", "设计模式从入门到拷贝代码").field("tags", "设计模式").field("price", 55.9).endObject()));bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(XContentFactory.jsonBuilder().startObject().field("name", "架构设计从入门到google搜索").field("tags", "架构设计").field("price", 68.9).endObject()));bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder().startObject().field("price", 32.8).endObject())));BulkResponse bulkRes = bulk.get();if (bulkRes.hasFailures()) {System.out.println("Error...");}} catch (IOException e) {e.printStackTrace();}}
scorll请求
public static void scorll(TransportClient client) {SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();int batchCnt = 0;do {// 循环读取scrollid信息,直到结果为空for(SearchHit hit: bookShop.getHits().getHits()) {System.out.println("batchCnt:" + ++batchCnt);System.out.println(hit.getSourceAsString());}bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();} while (bookShop.getHits().getHits().length != 0);}
搜索模板
public static void searchTemplates(TransportClient client) {Map<String,Object> params = new HashMap<>(10);params.put("from",0);params.put("size",10);params.put("tags","java");SearchTemplateResponse str = new SearchTemplateRequestBuilder(client).setScript("page_query_by_tags").setScriptType(ScriptType.STORED).setScriptParams(params).setRequest(new SearchRequest()).get();for(SearchHit hit:str.getResponse().getHits().getHits()) {System.out.println(hit.getSourceAsString());}}
多条件组合查询
public static void otherSearch(TransportClient client) {SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();System.out.println(response1.getHits().getHits()[0].getSourceAsString());System.out.println(response2.getHits().getHits()[0].getSourceAsString());System.out.println(response3.getHits().getHits()[0].getSourceAsString());System.out.println(response4.getHits().getHits()[0].getSourceAsString());// 多个条件组合SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("tags", "java")).mustNot(QueryBuilders.matchQuery("name", "跑路")).should(QueryBuilders.matchQuery("name", "入门")).filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();System.out.println(response5.getHits().getHits()[0].getSourceAsString());}
地理位置查询
public static void geo(TransportClient client) {GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);List<GeoPoint> points = new ArrayList<>();points.add(new GeoPoint(23,115));points.add(new GeoPoint(25,113));points.add(new GeoPoint(21,112));GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);SearchResponse response = client.prepareSearch("location").setQuery(query3).get();for(SearchHit hit:response.getHits().getHits()) {System.out.println(hit.getSourceAsString());}}
小结
上述的那些案例demo,快速浏览一下即可,如果已经在开发ES相关的项目,还是多参考官方的API文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/index.html。上面有很详尽的API说明和使用Demo
文章转载自Java架构社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




