暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ElasticSearch入门之彼行我释(四)

我是攻城师 2015-03-17
193



散仙在上篇文章中,介绍了关于ElasticSearch基本的增删改查的基本粒子,本篇呢,我们来学下稍微高级一点的知识:


(1)如何在ElasticSearch中批量提交索引 ?
(2)如何使用高级查询(包括,检索,排序,过滤,分页) ?
(3)如何组合多个查询 ?
(4)如何使用翻页深度查询 ?
(5)如何使用基本的聚合查询 ?



(一)首先,我们思考下,为什么要使用批量添加,这个毫无疑问,因为效率问题,举个在生活中的例子,假如我们有50个人,要去美国旅游,不使用批处理的方式是,给每一个人派一架飞机送到美国,那么这就需要50次飞机的来回往来,假如使用了批处理,现在的情况就是一个飞机坐50个人,只需一次即可把所有人都送到美国,效率可想而知,生活也有很多实际的例子,大家可以自己想想。

在原生的lucene中,以及solr中,这个批处理方式,实质是控制commit的时机,比如多少个提交一次,或者超过ranbuffersize的大小后自动提交,es封装了lucene的api提供bulk的方式来批量添加,原理也是,聚集一定的数量doc,然后发送一次添加请求。


(二)只要我们使用了全文检索,我们的业务就会有各种各样的api操作,包括,任意维度的字段查询,过滤掉某些无效的信息,然后根据某个字段排序,再取topN的结果集返回,使用数据库的小伙伴们,相信大家都不陌生,在es中,这些操作都是支持的,而且还非常高效,它能满足我们大部分的需求


(三)在es中,我们可以查询多个index,以及多个type,这一点是非常灵活地,我们,我们可以一次组装两个毫无关系的查询,发送到es服务端进行检索,然后获取结果。


(四)es中,通过了scorll的方式,支持深度分页查询,在数据库里,我们使用的是一个cursor游标来记录读取的偏移量,同样的在es中也支持,这样的查询方式,它通过一个scrollid记录了上一次查询的状态,能轻而易举的实现深度翻页,本质上是对了Lucene的SearchAfter的封装。

(五)es中,也提供了对聚合函数的支持,比如一些max,min,avg,count,sum等支持,除此之外还支持group,facet等操作,这些功能,在电商中应用非常广泛,基于lucene的solr和es都有很好的支持。

下面截图看下散仙的测试数据值:



源码demo如下:

Java代码

  1. package com.dongliang.es;

  2. import java.util.Date;

  3. import java.util.Map;

  4. import java.util.Map.Entry;

  5. import org.apache.lucene.index.Terms;

  6. import org.elasticsearch.action.bulk.BulkRequestBuilder;

  7. import org.elasticsearch.action.bulk.BulkResponse;

  8. import org.elasticsearch.action.search.MultiSearchResponse;

  9. import org.elasticsearch.action.search.SearchRequestBuilder;

  10. import org.elasticsearch.action.search.SearchResponse;

  11. import org.elasticsearch.action.search.SearchType;

  12. import org.elasticsearch.client.Client;

  13. import org.elasticsearch.client.transport.TransportClient;

  14. import org.elasticsearch.common.transport.InetSocketTransportAddress;

  15. import org.elasticsearch.common.unit.TimeValue;

  16. import org.elasticsearch.common.xcontent.XContentBuilder;

  17. import org.elasticsearch.common.xcontent.XContentFactory;

  18. import org.elasticsearch.index.query.FilterBuilders;

  19. import org.elasticsearch.index.query.QueryBuilders;

  20. import org.elasticsearch.index.query.QueryStringQueryBuilder;

  21. import org.elasticsearch.search.SearchHit;

  22. import org.elasticsearch.search.aggregations.AggregationBuilders;

  23. import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters.Bucket;

  24. import org.elasticsearch.search.sort.SortOrder;

  25. /**

  26. * @author 三劫散仙

  27. * 搜索技术交流群:324714439

  28. * 一个关于elasticsearch批量提交

  29. * 和search query的的例子

  30. * **/

  31. public class ElasticSearchDao {

  32. //es的客户端实例

  33. Client client=null;

  34. {

  35. //连接单台机器,注意ip和端口号,不能写错

  36. client=new TransportClient().

  37. addTransportAddress(new InetSocketTransportAddress("192.168.46.16", 9300));

  38. }

  39. public static void main(String[] args)throws Exception {

  40. ElasticSearchDao es=new ElasticSearchDao();

  41. //es.indexdata();//索引数据

  42. //es.queryComplex();

  43. es.querySimple();

  44. //es.scorllQuery();

  45. //es.mutilCombineQuery();

  46. //es.aggregationQuery();

  47. }

  48. /**组合分组查询*/

  49. public void aggregationQuery()throws Exception{

  50. SearchResponse sr = client.prepareSearch()

  51. .setQuery(QueryBuilders.matchAllQuery())

  52. .addAggregation(

  53. AggregationBuilders.terms("1").field("type")

  54. )

  55. // .addAggregation(

  56. // AggregationBuilders.dateHistogram("agg2")

  57. // .field("birth")

  58. // .interval(DateHistogram.Interval.YEAR)

  59. // )

  60. .execute().actionGet();

  61. // Get your facet results

  62. org.elasticsearch.search.aggregations.bucket.terms.Terms a = sr.getAggregations().get("1");

  63. for(org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bk:a.getBuckets()){

  64. System.out.println("类型: "+bk.getKey()+" 分组统计数量 "+bk.getDocCount()+" ");

  65. }

  66. System.out.println("聚合数量:"+a.getBuckets().size());

  67. //DateHistogram agg2 = sr.getAggregations().get("agg2");

  68. //结果:

  69. // 类型: 1 分组数量 2

  70. // 类型: 2 分组数量 1

  71. // 类型: 3 分组数量 1

  72. // 聚合数量:3

  73. }

  74. /**多个不一样的请求组装*/

  75. public void mutilCombineQuery(){

  76. //查询请求1

  77. SearchRequestBuilder srb1 =client.prepareSearch().setQuery(QueryBuilders.queryString("eng").field("address")).setSize(1);

  78. //查询请求2//matchQuery

  79. SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("title", "标题")).setSize(1);

  80. //组装查询

  81. MultiSearchResponse sr = client.prepareMultiSearch().add(srb1).add(srb2).execute().actionGet();

  82. // You will get all individual responses from MultiSearchResponse#getResponses()

  83. long nbHits = 0;

  84. for (MultiSearchResponse.Item item : sr.getResponses()) {

  85. SearchResponse response = item.getResponse();

  86. for(SearchHit hits:response.getHits().getHits()){

  87. String sourceAsString = hits.sourceAsString();//以字符串方式打印

  88. System.out.println(sourceAsString);

  89. }

  90. nbHits += response.getHits().getTotalHits();

  91. }

  92. System.out.println("命中数据量:"+nbHits);

  93. //输出:

  94. // {"title":"我是标题","price":25.65,"type":1,"status":true,"address":"血落星域风阳星","createDate":"2015-03-16T09:56:20.440Z"}

  95. // 命中数据量:2

  96. client.close();

  97. }

  98. /**

  99. * 翻页查询

  100. * */

  101. public void scorllQuery()throws Exception{

  102. QueryStringQueryBuilder queryString = QueryBuilders.queryString("标题").field("title");

  103. //TermQueryBuilder qb=QueryBuilders.termQuery("title", "我是标题");

  104. SearchResponse scrollResp = client.prepareSearch("collection1")

  105. .setSearchType(SearchType.SCAN)

  106. .setScroll(new TimeValue(60000))

  107. .setQuery(queryString)

  108. .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll

  109. while (true) {

  110. for (SearchHit hit : scrollResp.getHits().getHits()) {

  111. //Handle the hit...

  112. String sourceAsString = hit.sourceAsString();//以字符串方式打印

  113. System.out.println(sourceAsString);

  114. }

  115. //通过scrollid来实现深度翻页

  116. scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();

  117. //Break condition: No hits are returned

  118. if (scrollResp.getHits().getHits().length == 0) {

  119. break;

  120. }

  121. }

  122. //输出

  123. // {"title":"我是标题","price":25.65,"type":1,"status":true,"address":"血落星域风阳星","createDate":"2015-03-16T09:56:20.440Z"}

  124. // {"title":"标题","price":251.65,"type":1,"status":true,"address":"美国东部","createDate":"2015-03-16T10:33:58.743Z"}

  125. client.close();

  126. }

  127. /**简单查询*/

  128. public void querySimple()throws Exception{

  129. SearchResponse sp = client.prepareSearch("collection1").execute().actionGet();

  130. for(SearchHit hits:sp.getHits().getHits()){

  131. String sourceAsString = hits.sourceAsString();//以字符串方式打印

  132. System.out.println(sourceAsString);

  133. }

  134. //结果

  135. // {"title":"我是标题","price":25.65,"type":1,"status":true,"address":"血落星域风阳星","createDate":"2015-03-16T09:56:20.440Z"}

  136. // {"title":"中国","price":205.65,"type":2,"status":true,"address":"河南洛阳","createDate":"2015-03-16T10:33:58.740Z"}

  137. // {"title":"标题","price":251.65,"type":1,"status":true,"address":"美国东部","createDate":"2015-03-16T10:33:58.743Z"}

  138. // {"title":"elasticsearch是一个搜索引擎","price":25.65,"type":3,"status":true,"address":"china","createDate":"2015-03-16T10:33:58.743Z"}

  139. }

  140. /**组合查询**/

  141. public void queryComplex()throws Exception{

  142. SearchResponse sp=client.prepareSearch("collection1")//检索的目录

  143. .setTypes("core1")//检索的索引

  144. .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//Query type

  145. .setQuery(QueryBuilders.termQuery("type", "1"))//查询--Query

  146. .setPostFilter(FilterBuilders.rangeFilter("price").from(10).to(550.23))//过滤 --Filter

  147. .addSort("price",SortOrder.DESC) //排序 -- sort

  148. .setFrom(0).setSize(20).setExplain(true)//topN方式

  149. .execute().actionGet();//执行

  150. System.out.println("本次查询命中条数: "+sp.getHits().getTotalHits());

  151. for(SearchHit hits:sp.getHits().getHits()){

  152. //String sourceAsString = hits.sourceAsString();//以字符串方式打印

  153. //System.out.println(sourceAsString);

  154. Map<String, Object> sourceAsMap = hits.sourceAsMap();

  155. for(Entry<String, Object> k:sourceAsMap.entrySet()){

  156. System.out.println("name: "+k.getKey()+" value: "+k.getValue());

  157. }

  158. System.out.println("=============================================");

  159. }

  160. //结果

  161. // 本次查询命中条数: 2

  162. // name: title value: 标题

  163. // name: price value: 251.65

  164. // name: address value: 美国东部

  165. // name: status value: true

  166. // name: createDate value: 2015-03-16T10:33:58.743Z

  167. // name: type value: 1

  168. // =============================================

  169. // name: title value: 我是标题

  170. // name: price value: 25.65

  171. // name: address value: 血落星域风阳星

  172. // name: status value: true

  173. // name: createDate value: 2015-03-16T09:56:20.440Z

  174. // name: type value: 1

  175. // =============================================

  176. client.close();

  177. }

  178. /**索引数据*/

  179. public void indexdata()throws Exception{

  180. BulkRequestBuilder bulk=client.prepareBulk();

  181. XContentBuilder doc=XContentFactory.jsonBuilder()

  182. .startObject()

  183. .field("title","中国")

  184. .field("price",205.65)

  185. .field("type",2)

  186. .field("status",true)

  187. .field("address", "河南洛阳")

  188. .field("createDate", new Date()).endObject();

  189. //collection为索引库名,类似一个数据库,索引名为core,类似一个表

  190. // client.prepareIndex("collection1", "core1").setSource(doc).execute().actionGet();

  191. //批处理添加

  192. bulk.add(client.prepareIndex("collection1", "core1").setSource(doc));

  193. doc=XContentFactory.jsonBuilder()

  194. .startObject()

  195. .field("title","标题")

  196. .field("price",251.65)

  197. .field("type",1)

  198. .field("status",true)

  199. .field("address", "美国东部")

  200. .field("createDate", new Date()).endObject();

  201. //collection为索引库名,类似一个数据库,索引名为core,类似一个表

  202. // client.prepareIndex("collection1", "core1").setSource(doc).execute().actionGet();

  203. //批处理添加

  204. bulk.add(client.prepareIndex("collection1", "core1").setSource(doc));

  205. doc=XContentFactory.jsonBuilder()

  206. .startObject()

  207. .field("title","elasticsearch是一个搜索引擎")

  208. .field("price",25.65)

  209. .field("type",3)

  210. .field("status",true)

  211. .field("address", "china")

  212. .field("createDate", new Date()).endObject();

  213. //collection为索引库名,类似一个数据库,索引名为core,类似一个表

  214. //client.prepareIndex("collection1", "core1").setSource(doc).execute().actionGet();

  215. //批处理添加

  216. bulk.add(client.prepareIndex("collection1", "core1").setSource(doc));

  217. //发一次请求,提交所有数据

  218. BulkResponse bulkResponse = bulk.execute().actionGet();

  219. if (!bulkResponse.hasFailures()) {

  220. System.out.println("创建索引success!");

  221. } else {

  222. System.out.println("创建索引异常:"+bulkResponse.buildFailureMessage());

  223. }

  224. client.close();//释放资源

  225. // System.out.println("索引成功!");

  226. }

  227. }





想了解更多有关电商互联网公司的搜索技术和大数据技术的使用,请欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!




文章转载自我是攻城师,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论