暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Elasticsearch Data Streams 完全指南

新智锦绣 2025-07-03
149

点击蓝字关注我们


前言

随着现代应用程序产生的时序数据量不断增长,如何高效地存储、管理和查询这些数据成为了一个重要挑战。Elasticsearch 在 7.9 版本中引入了 Data streams 功能,为时序数据管理提供了一个优雅的解决方案。本文将深入探讨 Data streams 的概念、优势、使用方法以及最佳实践。


什么是 Data Streams


Data streams 是 Elasticsearch 中用于管理时序数据的抽象层。它将多个相关的索引(称为 backing indices)组织在一起,为用户提供统一的数据写入和查询接口。从用户角度来看,Data stream 就像一个单一的索引,但在底层,它由多个按时间顺序排列的索引组成。


核心概念


Backing Indices(后备索引)

  • Data stream 的底层存储单元

  • 按时间顺序命名,如 .ds-logs-2023.12.01-000001

  • 只有最新的索引可以接收写入操作

  • 旧索引变为只读状态

Write Index(写入索引)

  • Data stream 中唯一可以接收新文档的索引

  • 当满足轮转条件时,会创建新的写入索引

Index Template(索引模板)

  • 定义 Data stream 的结构和配置

  • 必须包含 data_stream 配置块


为什么需要 Data Streams


传统索引管理的痛点


在 Data streams 出现之前,管理时序数据通常需要:

    # 手动创建带时间戳的索引
    PUT logs-2023.12.01
    PUT logs-2023.12.02
    PUT logs-2023.12.03


    # 使用别名管理多个索引
    POST _aliases
    {
      "actions": [
        {"add": {"index""logs-2023.12.*""alias""logs"}}
      ]
    }

    这种方式存在以下问题:

    • 需要手动管理索引的创建和轮转

    • 别名管理复杂,容易出错

    • 难以实现自动化的生命周期管理

    • 查询时需要考虑跨多个索引的性能优化


    Data Streams 的优势


    1. 自动化管理

    • 自动创建和轮转索引

    • 无需手动管理索引生命周期

    • 内置与 ILM(Index Lifecycle Management)的集成

    2. 简化操作

    • 统一的写入和查询接口

    • 透明的多索引管理

    • 减少运维复杂度

    3. 性能优化

    • 针对时序数据优化的存储结构

    • 更高效的查询性能

    • 支持并行处理

    4. 数据完整性

    • 只允许追加操作,保证数据不被意外修改

    • 支持文档级别的版本控制


    Data Streams 的工作原理


    数据写入流程


    graph TD

        A[客户端写入请求] --> B[Data Stream]

        B --> C[路由到写入索引]

        C --> D[检查轮转条件]

        D -->|满足条件| E[创建新的写入索引]

        D -->|不满足| F[写入当前索引]

        E --> G[更新写入索引指针]

        G --> F


    索引轮转机制


    索引轮转可以基于多种条件触发:

    • 时间条件:如每天、每周轮转

    • 大小条件:索引达到指定大小

    • 文档数量:索引包含的文档数量

    • 自定义条件:通过 ILM 策略定义


    Data streams的脉络图:



    实战:创建和使用 Data Streams


    步骤1:创建索引模板


      PUT _index_template/logs-template
      {
        "index_patterns": ["logs-*"],
        "data_stream": {},
        "template": {
          "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1,
            "index.lifecycle.name""logs-lifecycle-policy"
          },
          "mappings": {
            "properties": {
              "@timestamp": {
                "type""date"
              },
              "service": {
                "type""keyword"
              },
              "level": {
                "type""keyword"
              },
              "message": {
                "type""text",
                "analyzer""standard"
              },
              "host": {
                "type""keyword"
              },
              "user_id": {
                "type""keyword"
              }
            }
          }
        },
        "priority": 200,
        "composed_of": ["logs-mappings""logs-settings"],
        "_meta": {
          "description""Template for application logs data stream"
        }
      }


      步骤2:配置生命周期策略


        PUT _ilm/policy/logs-lifecycle-policy
        {
          "policy": {
            "phases": {
              "hot": {
                "actions": {
                  "rollover": {
                    "max_size""10GB",
                    "max_age""7d",
                    "max_docs": 10000000
                  },
                  "set_priority": {
                    "priority": 100
                  }
                }
              },
              "warm": {
                "min_age""7d",
                "actions": {
                  "set_priority": {
                    "priority": 50
                  },
                  "allocate": {
                    "number_of_replicas": 0
                  },
                  "forcemerge": {
                    "max_num_segments": 1
                  }
                }
              },
              "cold": {
                "min_age""30d",
                "actions": {
                  "set_priority": {
                    "priority": 0
                  },
                  "allocate": {
                    "number_of_replicas": 0
                  }
                }
              },
              "delete": {
                "min_age""90d",
                "actions": {
                  "delete": {}
                }
              }
            }
          }
        }


        步骤3:写入数据


          # 第一次写入会自动创建 data stream
          POST logs-application/_doc
          {
            "@timestamp""2023-12-01T10:00:00Z",
            "service""user-service",
            "level""INFO",
            "message""User authentication successful",
            "host""server-01",
            "user_id""user123"
          }


          # 批量写入
          POST logs-application/_bulk
          {"index":{}}
          {"@timestamp":"2023-12-01T10:01:00Z","service":"order-service","level":"ERROR","message":"Database connection failed","host":"server-02"}
          {"index":{}}
          {"@timestamp":"2023-12-01T10:02:00Z","service":"payment-service","level":"WARN","message":"Payment processing delayed","host":"server-03"}


          步骤4:查询数据


            # 简单查询
            GET logs-application/_search
            {
              "query": {
                "range": {
                  "@timestamp": {
                    "gte""2023-12-01T00:00:00Z",
                    "lte""2023-12-01T23:59:59Z"
                  }
                }
              }
            }


            # 聚合查询
            GET logs-application/_search
            {
              "size": 0,
              "aggs": {
                "services": {
                  "terms": {
                    "field""service",
                    "size": 10
                  },
                  "aggs": {
                    "levels": {
                      "terms": {
                        "field""level"
                      }
                    }
                  }
                },
                "error_timeline": {
                  "filter": {
                    "term": {"level""ERROR"}
                  },
                  "aggs": {
                    "timeline": {
                      "date_histogram": {
                        "field""@timestamp",
                        "calendar_interval""1h"
                      }
                    }
                  }
                }
              }
            }


            高级特性和配置


            1. 自定义索引命名


              PUT _index_template/custom-logs-template
              {
                "index_patterns": ["custom-logs-*"],
                "data_stream": {},
                "template": {
                  "settings": {
                    "index.lifecycle.name""custom-logs-policy",
                    "index.lifecycle.rollover_alias""custom-logs"
                  }
                }
              }


              2. 多租户 Data Streams


                # 为不同租户创建独立的 data streams
                POST logs-tenant-a/_doc
                {"@timestamp""2023-12-01T10:00:00Z""tenant""tenant-a""message""Tenant A log"}


                POST logs-tenant-b/_doc
                {"@timestamp""2023-12-01T10:00:00Z""tenant""tenant-b""message""Tenant B log"}


                3. 跨集群复制


                  # 在从集群上配置跨集群复制
                  PUT _ccr/follow/logs-replica
                  {
                    "remote_cluster""primary-cluster",
                    "leader_index""logs-application",
                    "follow_index""logs-application-replica"
                  }


                  监控和管理


                  查看 Data Stream 信息


                  1.列出所有 data streams

                    GET _data_stream

                    2.查看特定 data stream 的详细信息

                      GET _data_stream/logs-application

                      3.查看 backing indices

                        GET logs-application/_ilm/explain


                        性能监控


                          # 监控索引统计信息
                          GET logs-application/_stats


                          # 查看分片分配情况
                          GET _cat/shards/logs-application?v


                          # 监控 ILM 策略执行状态
                          GET logs-application/_ilm/explain


                          故障排查


                            # 检查模板匹配情况
                            GET _index_template/logs-template


                            # 查看索引健康状态
                            GET _cluster/health/logs-application


                            # 检查写入索引状态
                            GET _cat/indices/.ds-logs-application-*?v&s=index


                            最佳实践


                            1. 合理设计索引模板


                              PUT _index_template/optimized-logs-template
                              {
                                "index_patterns": ["logs-*"],
                                "data_stream": {},
                                "template": {
                                  "settings": {
                                    "number_of_shards": 1,
                                    "number_of_replicas": 1,
                                    "refresh_interval""30s",
                                    "index.lifecycle.name""logs-policy",
                                    "index.codec""best_compression",
                                    "index.sort.field""@timestamp",
                                    "index.sort.order""desc"
                                  }
                                }
                              }


                              2. 优化 ILM 策略


                              • Hot 阶段:设置合理的轮转条件,避免索引过大或过小

                              • Warm 阶段:减少副本数,执行段合并

                              • Cold 阶段:进一步降低存储成本

                              • Delete 阶段:根据数据保留策略及时删除过期数据


                              3. 监控关键指标


                              • 写入性能和延迟

                              • 索引轮转频率

                              • 存储空间使用情况

                              • 查询性能


                              4. 安全考虑


                                # 配置索引级别的安全策略
                                PUT _security/role/logs-reader
                                {
                                  "indices": [
                                    {
                                      "names": ["logs-*"],
                                      "privileges": ["read""view_index_metadata"]
                                    }
                                  ]
                                }


                                性能优化技巧


                                1. 分片策略


                                • 根据数据量和查询模式确定分片数量

                                • 避免过多小分片,影响集群性能

                                • 考虑使用索引排序提高查询效率


                                2. 映射优化


                                  {
                                    "mappings": {
                                      "properties": {
                                        "@timestamp": {"type": "date"},
                                        "message": {
                                          "type": "text",
                                          "index": false,  // 如果不需要全文搜索
                                          "doc_values": false
                                        },
                                        "status_code": {"type": "integer"},
                                        "ip_address": {
                                          "type": "ip",
                                          "ignore_malformed": true
                                        }
                                      }
                                    }
                                  }


                                  3. 查询优化


                                  • 使用时间范围过滤器

                                  • 利用索引排序特性

                                  • 合理使用聚合和分析


                                  常见问题和解决方案


                                  1. Data Stream 无法写入


                                  问题:新建的 data stream 无法接收数据

                                  解决方案:

                                  • 检查索引模板是否正确配置

                                  • 验证文档是否包含 @timestamp 字段

                                  • 确认集群状态正常


                                  2. 索引轮转不按预期执行


                                  问题:ILM 策略没有按预期轮转索引

                                  解决方案:

                                  # 手动执行 ILM 策略

                                  POST logs-application/_ilm/retry


                                  # 检查 ILM 状态

                                  GET _ilm/status


                                  3. 查询性能问题


                                  问题:跨多个 backing indices 的查询性能较差

                                  解决方案:

                                  • 使用时间范围过滤器限制查询范围

                                  • 优化索引设置和映射

                                  • 考虑使用索引排序


                                  总结


                                  Data streams 是 Elasticsearch 为时序数据管理提供的强大功能,它简化了索引管理、提高了运维效率,并提供了更好的性能表现。通过合理的配置和最佳实践,Data streams 可以显著改善时序数据的存储和查询体验。

                                  在实际应用中,建议从小规模开始尝试,逐步优化配置参数,并建立完善的监控体系。随着对 Data streams 理解的深入,你将能够充分发挥其在大规模时序数据处理中的优势。

                                  关于公司

                                  感谢您关注新智锦绣科技(北京)有限公司!作为 Elastic 的 Elite 合作伙伴及 EnterpriseDB 在国内的唯一代理和服务合作伙伴,我们始终致力于技术创新和优质服务,帮助企业客户实现数据平台的高效构建与智能化管理。无论您是关注 Elastic 生态系统,还是需要 EnterpriseDB 的支持,我们都将为您提供专业的技术支持和量身定制的解决方案。


                                  欢迎关注我们,获取更多技术资讯和数字化转型方案,共创美好未来!

                                  Elastic 微信群

                                  EDB 微信群


                                  发现“分享”“赞”了吗,戳我看看吧


                                  文章转载自新智锦绣,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论