暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ElasticSearch搭建与实战

绘空事J 2021-07-19
3524

    上周自主研究了ElasticSearch,本想着能够给项目带来点新的优化,但发现就以目前项目业务需求来看,只会给项目带来复杂度,默默的含泪把重构完的代码还原。


    但没关系,对于程序员来说技多不压身,现在我就分享一下ELK的搭建与使用。


    交流促进学习、欢迎大家吐槽和多多交流。


ElasticSearch搭建


1.首先进入官方网址下载中间件,选择LINUX X86_64shaasc 版本,并解压到Linux服务器

    传送门: https://www.elastic.co/cn/downloads/elasticsearch
      tar -zxvf elasticsearch-7.13.2-linux-x86_64.tar.gz


      2.修改配置参数

        vim  config/elasticsearch.yml
          cluster.name: es-cluster  #集群节点
          node.name: master #节点名称
          network.host: 10.13.9.231 #服务器ip
          http.port: 9200 #服务器端口
          discovery.zen.ping.unicast.hosts: ["10.13.9.231"] #指向localhost
          cluster.initial_master_nodes: master


          3.创建Es账号

              由于ElasticSearch默认不允许root用户进行操作,所以我们创建一个Es账号

            groupadd es
            useradd es -g es -p root
            chown -R es:es opt/elasticsearch-7.13.2


            4.设置服务器最大使用文件限制

              vim etc/security/limits.conf
              * soft nofile 65536
              * hard nofile 65536
              vi etc/sysctl.conf
              vm.max_map_count=262144
                /sbin/sysctl -p


                5.启动项目

                  su es
                  ./bin/elasticsearch -d


                  6.搭建从节点

                      重复1-5步骤,添加配置文件

                    cluster.name: es-cluster #集群名称
                    node.name: slave1 #节点名称
                    network.host: 10.13.9.232 #网络地址
                    http.port: 9200 #端口
                    discovery.zen.ping.unicast.hosts: ["10.13.9.231"] #指向master

                        查看集群状态 :

                        http://10.13.9.231:9200/_cluster/health

                        20个分片,10个主节点、10个从节点,没毛病~

                      {
                      "cluster_name": "es-cluster",
                      "status": "green",
                      "timed_out": false,
                      "number_of_nodes": 3,
                      "number_of_data_nodes": 3,
                      "active_primary_shards": 10,
                      "active_shards": 20,
                      "relocating_shards": 0,
                      "initializing_shards": 0,
                      "unassigned_shards": 0,
                      "delayed_unassigned_shards": 0,
                      "number_of_pending_tasks": 0,
                      "number_of_in_flight_fetch": 0,
                      "task_max_waiting_in_queue_millis": 0,
                      "active_shards_percent_as_number": 100.0
                      }

                      Kibana搭建


                      1.下载安装kibana

                         官网传送门:https://www.elastic.co/cn/downloads/kibana
                          tar -zxvf kibana-7.13.2-linux-x86_64.tar.gz

                          2.修改配置文件

                            vi config/kibana.yml
                              server.host: "10.13.9.231"
                              elasticsearch.hosts: ["http://10.13.9.231:9200"]
                              kibana.index: ".kibana"
                              elasticsearch.username: "kibana"
                              elasticsearch.password: "123456"
                              i18n.locale: "zh-CN"

                              3.授权

                                chown -R es:es opt/kibana

                                4.运行

                                  ./bin/kibana
                                  nohup ./kibana &
                                    启动界面http://10.13.9.231:5601/app/kibana_overview#/

                                    Logstash


                                    1.下载安装

                                      官网传送门:https://www.elastic.co/cn/downloads/logstash
                                      下载并解压 :tar -zxvf logstash-7.13.2-linux-x86_64.tar.gz

                                      2.使用Logstash同步mysql数据到ElasticSearch

                                          修改配置文件如下图

                                          定时地从数据库中读取数据并同步到ES,1分钟一次,通过上次执行时间来更新。

                                        执行./logstash -f ../config/logstash-sample.conf 启动logstash


                                            

                                            在kibana中可以查看到同步的数据


                                         

                                        Java中使用ElasticSearch

                                         

                                          package com.acadsoc.es.service;


                                          import com.alibaba.fastjson.JSON;
                                          import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
                                          import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
                                          import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
                                          import org.elasticsearch.action.delete.DeleteRequest;
                                          import org.elasticsearch.action.delete.DeleteResponse;
                                          import org.elasticsearch.action.get.GetRequest;
                                          import org.elasticsearch.action.get.GetResponse;
                                          import org.elasticsearch.action.index.IndexRequest;
                                          import org.elasticsearch.action.index.IndexResponse;
                                          import org.elasticsearch.action.search.SearchRequest;
                                          import org.elasticsearch.action.search.SearchResponse;
                                          import org.elasticsearch.action.update.UpdateRequest;
                                          import org.elasticsearch.action.update.UpdateResponse;
                                          import org.elasticsearch.client.*;
                                          import org.elasticsearch.client.indices.GetIndexRequest;
                                          import org.elasticsearch.common.xcontent.XContentType;
                                          import org.elasticsearch.index.query.*;
                                          import org.elasticsearch.index.reindex.BulkByScrollResponse;
                                          import org.elasticsearch.index.reindex.UpdateByQueryRequest;
                                          import org.elasticsearch.script.Script;
                                          import org.elasticsearch.script.ScriptType;
                                          import org.elasticsearch.script.mustache.SearchTemplateRequest;
                                          import org.elasticsearch.script.mustache.SearchTemplateResponse;
                                          import org.elasticsearch.search.SearchHit;
                                          import org.elasticsearch.search.builder.SearchSourceBuilder;
                                          import org.slf4j.Logger;
                                          import org.slf4j.LoggerFactory;
                                          import org.springframework.beans.factory.annotation.Autowired;
                                          import org.springframework.stereotype.Service;
                                          import org.springframework.util.CollectionUtils;


                                          import java.io.IOException;
                                          import java.util.*;


                                          /**
                                          * Description:TODO
                                          * Author: Amadeus Huang
                                          * Date: Created in 2021/7/4 22:05
                                          */
                                          @Service("esService")
                                          public class EsService implements EsServiceInterface {
                                          private Logger logger = LoggerFactory.getLogger(EsService.class);
                                          @Autowired
                                          private RestHighLevelClient restHighLevelClient;
                                          /**
                                          * 增加文档信息
                                          */
                                          public void addDocument(String index ,Object obj) {
                                          try {
                                          // 创建索引请求对象
                                          IndexRequest indexRequest = new IndexRequest(index);
                                          byte[] json = JSON.toJSONBytes(obj);
                                          // 设置文档内容
                                          indexRequest.source(json, XContentType.JSON);
                                          // 执行增加文档
                                          IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
                                          logger.info("es创建状态:{}", response.status());
                                          } catch (Exception e) {
                                          logger.error("es创建失败", e);
                                          }
                                          }


                                          /**
                                          * 获取文档信息
                                          */
                                          public String getDocument(String index,String id) {
                                          try {
                                          // 获取请求对象
                                          GetRequest getRequest = new GetRequest(index,id);
                                          // 获取文档信息
                                          GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
                                          // 将 JSON 转换成对象
                                          if (getResponse.isExists()) {
                                          String sourceAsString = getResponse.getSourceAsString();
                                          logger.info("es获取信息:{}", sourceAsString);
                                          return sourceAsString;
                                          }
                                          } catch (IOException e) {
                                          logger.error("", e);
                                          }
                                          return null;
                                          }


                                          /**
                                          * 更新文档信息(替换)
                                          */
                                          public void updateDocument(String index ,String id ,Object obj) {
                                          try {
                                          // 创建索引请求对象
                                          UpdateRequest updateRequest = new UpdateRequest(index,id);
                                          // 将对象转换为 byte 数组
                                          byte[] json = JSON.toJSONBytes(obj);
                                          // 设置更新文档内容
                                          updateRequest.doc(json, XContentType.JSON);
                                          // 执行更新文档
                                          UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
                                          logger.info("es创建状态:{}", response.status());
                                          } catch (Exception e) {
                                          logger.error("", e);
                                          }
                                          }


                                          public void upsertDocument(String index ,String id ,Object obj){
                                          try {
                                          // 将对象转换为 byte 数组
                                          byte[] json = JSON.toJSONBytes(obj);
                                          // 创建更新请求,指定index,type,id,如果indexRequest 有值 (存在该数据)则用doc指定的内容更新indexRequest中指定的source,如果不存在该数据,则插入一条indexRequest指定的source数据
                                          UpdateRequest updateRequest = new UpdateRequest(index, id)
                                          .doc(json, XContentType.JSON)
                                          .upsert(json, XContentType.JSON);
                                          updateRequest.docAsUpsert(true);


                                          // 将更新请求加入批量操作请求
                                          UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
                                          logger.info("upsertDocument创建状态:{}", response.status());
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }


                                          }


                                          /**
                                          * 删除文档信息
                                          */
                                          public void deleteDocument(String index ,String id ) {
                                          try {
                                          // 创建删除请求对象
                                          DeleteRequest deleteRequest = new DeleteRequest(index,id);
                                          // 执行删除文档
                                          DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                                          logger.info("es删除状态:{}", response.status());
                                          } catch (IOException e) {
                                          logger.error("", e);
                                          }
                                          }
                                          /**
                                          * Description:column 中 list元素只要有一个命中就返回
                                          * Author: Amadeus Huang
                                          * Date: Created in 2021/7/10 18:37
                                          */
                                          //https://www.elastic.co/guide/en/elasticsearch/client/java-api/7.9/java-term-level-queries.html#java-query-dsl-terms-query
                                          public <T> String termsQuery(String index , String column , List<T> list,int from ,int size){
                                          //构建查询
                                          SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                                          sourceBuilder.from(from);
                                          sourceBuilder.size(size);
                                          //查询条件 Terms
                                          TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(column, list);
                                          //bool查询 -must -filter -should -must_not
                                          BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
                                          boolBuilder.must(termsQueryBuilder);


                                          sourceBuilder.query(boolBuilder);
                                          SearchRequest searchRequest = new SearchRequest(index);
                                          searchRequest.source(sourceBuilder);
                                          try {
                                          SearchResponse response = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
                                          System.out.println(response);
                                          return response.toString();
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }
                                          return null;
                                          }




                                          public String dslQuery(String index,String dsl){
                                          SearchTemplateRequest request = new SearchTemplateRequest();
                                          //指定索引
                                          request.setRequest(new SearchRequest(index));
                                          //设置为内联
                                          request.setScriptType(ScriptType.INLINE);
                                          String json=dsl;


                                          Map<String, Object> scriptParams = new HashMap<>();
                                          request.setScriptParams(scriptParams);


                                          //设置脚本
                                          request.setScript(json);


                                          try {
                                          SearchTemplateResponse response = restHighLevelClient.searchTemplate(request, RequestOptions.DEFAULT);
                                          //输出查询结果
                                          response.getResponse().getHits().forEach(System.out::println);
                                          return response.toString();
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }
                                          return null;


                                          }


                                          public long dslQueryCount(String index,String dsl){
                                          SearchTemplateRequest request = new SearchTemplateRequest();
                                          //指定索引
                                          request.setRequest(new SearchRequest(index));
                                          //设置为内联
                                          request.setScriptType(ScriptType.INLINE);
                                          String json=dsl;


                                          Map<String, Object> scriptParams = new HashMap<>();
                                          request.setScriptParams(scriptParams);


                                          //设置脚本
                                          request.setScript(json);


                                          try {
                                          SearchTemplateResponse response = restHighLevelClient.searchTemplate(request, RequestOptions.DEFAULT);
                                          //输出查询结果
                                          response.getResponse().getHits().forEach(System.out::println);
                                          return response.getResponse().getHits().getTotalHits().value;
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }
                                          return 0;
                                          }
                                          /**
                                          * Description:更新 慎用:75次/5min
                                          * Author: Amadeus Huang
                                          * Date: Created in 2021/7/13 12:26
                                          */
                                          public String dslUpdate(String index ,String dsl) throws IOException {
                                          UpdateByQueryRequest request = new UpdateByQueryRequest(index);
                                          Script script = new Script(ScriptType.INLINE, "painless",dsl, Collections.emptyMap());
                                          request.setScript(script);
                                          BulkByScrollResponse bulkResponse = restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
                                          System.out.println(bulkResponse);
                                          return bulkResponse.toString();
                                          }


                                          public List<Map<String, Object>> getResult(SearchRequest searchRequest){
                                          SearchResponse response = null;
                                          List<Map<String, Object>> result = new ArrayList<>();
                                          try {
                                          response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }
                                          if(response != null && response.getHits().getTotalHits().value>0){
                                          for(SearchHit hit : response.getHits()){
                                          Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                                          result.add(sourceAsMap);
                                          }
                                          }
                                          return result;
                                          }


                                          public boolean indexExists(String index){
                                          try {
                                          GetIndexRequest exist=new GetIndexRequest(index);
                                          boolean exists=restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
                                          return exists;
                                          } catch (IOException e) {
                                          e.printStackTrace();
                                          }
                                          return false;
                                          }




                                          }



                                          文章转载自绘空事J,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                          评论