暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Elasticsearch 入门(二)

剽悍的派森先生 2021-06-24
500

这一部分介绍一下 Elasticsearch 的使用以及部分原理。


CRUD

关于 Elasticsearch 的使用主要介绍对 Index 以及 Document 的 CRUD 操作。

先看关于 Index 的操作。

    import time
    import ujson
    from elasticsearch import Elasticsearch


    es = Elasticsearch(['localhost:9200'])


    # index
    class Index(object):
    def __init__(self, index):
    self.index = index


    def indices(self):
    for i in es.indices.get("*"):
    print(i)


    def create(self):
    obj = es.indices.create(self.index)
    print(ujson.dumps(obj, indent=2))


    def delete(self):
    obj = es.indices.delete(self.index)
    print(ujson.dumps(obj, indent=2))


    def mapping(self):
    obj = es.indices.get_mapping(self.index)
    print(ujson.dumps(obj, indent=2))


    def settings(self):
    obj = es.indices.get_settings(self.index)
    print(ujson.dumps(obj, indent=2))




    def index_test():
    index = Index("index_test")
    index.indices()
    index.create()
    index.delete()


    index = Index("index_test")
    index.settings()
    index.mapping()

    在创建 Index 的时候,我们可以不指定 schema,Elasticsearch 会根据插入的数据动态推断 Index 的 Mapping,我们称这个过程为 Dynamic mapping。输入数据与 Elasticsearch 数据类型的对应关系如下图。

    再看下关于 Document 的操作。

      import time
      import ujson
      from elasticsearch import Elasticsearch


      es = Elasticsearch(['localhost:9200'])


      # document
      class Document(object):
      def __init__(self, index):
      self.index = index
      self._id = "00-0"


      def count(self):
      body = {"query": {"match_all": {}}}
      obj = es.count(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def create(self):
      """id 可选字段,id 存在会先删除,再创建"""
      body = {
      "status": 1,
      "user_id": 1,
      "deleted": 1,
      "created_at": 1601813059,
      "pid": 0,
      "app_id": 22,
      "updated_at": 1601813059,
      "content": "I like switch",
      "title": "swimming",
      "bid": 1,
      "id": 1,
      "rid": 0,
      }
      obj = es.index(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))
      return obj["_id"]


      def create2(self):
      """id 必传字段, id 存在会失败"""
      now = int(time.time())
      body = {
      "status": 1,
      "user_id": 2,
      "deleted": 1,
      "created_at": now,
      "pid": 0,
      "app_id": 22,
      "updated_at": now,
      "content": "I like swimming and riding",
      "title": "likes",
      "bid": 2,
      "id": 2,
      "rid": 0,
      }
      obj = es.create(index=self.index, id=self._id, body=body)
      print(ujson.dumps(obj, indent=2))


      def delete(self, _id=None):
      obj = es.delete(index=self.index, id=_id or self._id)
      print(ujson.dumps(obj, indent=2))


      def update(self):
      """update partial doc"""
      body = {"doc": {"tag_ids":[1]}}
      obj = es.update(self.index, self._id, body=body)
      print(ujson.dumps(obj, indent=2))


      def update2(self):
      """update by script
      https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
      remove: "ctx._source.tag_ids.remove(ctx._source.tag_ids.indexOf(params.tag_id))"
      add: "ctx._source.tag_ids.add(params.tag_id)"
      append: `
      if(ctx._source.tag_ids != null) {
      ctx._source.tag_ids.add(params.tag_id)
      } else {
      ctx._source.tag_ids = [params.tag_id]
      }
      `
      """
      body = {
      "script": {
      "source": "ctx._source.tag_ids.add(params.tag_id)",
      "params": {"tag_id": 2},
      }
      }
      obj = es.update(self.index, self._id, body)
      print(ujson.dumps(obj, indent=2))


      def update3(self):
      """整个 doc 替换"""
      body = {
      "old": {
      "status": 1,
      "user_id": 5874837928,
      "deleted": 1,
      "created_at": 1601813059,
      "pid": 0,
      "app_id": 22,
      "updated_at": 1601813059,
      "content": "This is a test",
      "bid": 16550,
      "id": 10229,
      "rid": 0,
      }
      }
      obj = es.index(index=self.index, id=self._id, body=body)
      print(ujson.dumps(obj, indent=2))


      def sort(self):
      body = {
      "size": 2,
      "query": {"match_all": {}},
      "sort": [{"created_at": {"order": "desc"}}],
      }
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def search(self, _id=None):
      """搜索"""
      _id = _id or self._id
      body = {"query": {"term": {"_id": _id}}}
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      # ====== search ======
      def search2(self):
      """词条匹配查询"""
      body = {
      "query": {
      "match": {
      "content": {
      "query": "riding test",
      # "operator": "or",
      "operator": "and",
      # "minimum_should_match": 1,
      }
      }
      }
      }
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def search3(self):
      """短语匹配查询"""
      body = {
      "query": {
      "match_phrase": {
      "content": {
      "query": "I like riding",
      "slop": 2
      }
      }
      }
      }
      #
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def search4(self):
      """短语前缀查询"""
      body = {
      "query": {
      "match_phrase_prefix": {
      "content": {
      "query": "I like sw",
      },
      }
      }
      }
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def search5(self):
      """多字段匹配查询"""
      body = {
      "query": {
      "multi_match": {
      "query": "swimming",
      "type": "best_fields",
      "fields": ["title", "content"]
      }
      }
      }
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))


      def search6(self):
      """复合查询"""
      body = {
      "query": {
      "bool": {
      "must": [
      {
      "multi_match": {
      "query": "swimming",
      "type": "best_fields",
      "fields": ["title", "content"]
      }
      },
      ],
      "filter": [
      {
      "term": {"book_id": 2}
      }
      ]
      }
      }
      }
      obj = es.search(index=self.index, body=body)
      print(ujson.dumps(obj, indent=2))




      def doc_test():
      doc = Document("doreader_chapter_comment_bid")
      doc.count()
      _id = doc.create()
      doc.search(_id)


      doc.create2()
      doc.search()


      doc.update()
      doc.search()


      doc.update2()
      doc.search()


      # search
      doc.search2()
      doc.search3()
      doc.search4()
      doc.search5()
      doc.search6()


      doc.delete(_id)
      doc.delete()

      在通过 search 方法查询的时候,我们可以得到文档的元数据。所谓元数据就是指用于标注文档的相关信息,包括:

      • _index - 文档所属的索引名

      • _type - 文档所属的类型名

      • _id - 文档唯一 ID

      • _source - 文档的原始

      • JSON 数据

      • _version - 文档的版本信息

      • _sorce - 相关性打分

      原理

      1. 写流程




        1. 客户端随机选择一个 node 发送请求,这个 node 就是 coordinating node(协调节点)

        2. coordinating node 对 document 进行路由,将请求转发到该索引的 primary shard 的 node 上

        3. primary shard 所在的 node 处理请求,存储数据,然后把数据同步到它的 replica shard 上

        4. coordinating node 发现 primary shard 和 所有 replica shard 都同步了数据,就返回响应结果给客户端

      2. 搜索流程


      查询阶段



      拉去数据阶段

        1. 客户端随机选择一个 node 发送请求,这个 node 就是 coordinating node(协调节点)

        1. coordinating node 将搜索请求发送到所有 shard (从 primary shard 或 replica shard负载均衡搜索)

        1. 查询阶段:每个 shard 将结果(document的id)返回给协调节点,由协调节点进行合并、排序和分页等操作,产生最终结果

        1. 拉取数据阶段:协调节点根据id去各个节点拉取实际的 document 数据,最终返回给客户端

      3. 倒排索引

          参考链接:

      http://www.nosqlnotes.com/technotes/searchengine/lucene-invertedindex/

      文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论