暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何对 Elasticsearch、Filebeat、Logstash、Kibana 深度巡检?

在运维和开发中,确保 Elasticsearch、Filebeat、Logstash、Kibana(简称 EFLK)集群的稳定运行至关重要。

本文将详细介绍一套深度巡检方案,包括各组件的监控方法、健康状态检查、性能指标监控,以及一些关键的 DSL 查询示例,帮助大家全面掌握集群状态,及时发现潜在问题,优化 EFLK 的运行。

一、Elasticsearch 深度巡检方案

1. 集群健康检查

使用 _cluster/health
API 获取集群的整体健康状况:

GET _cluster/health

返回的关键字段:

  • status
    :集群状态(green
    yellow
    red
  • number_of_nodes
    :节点数量
  • active_primary_shards
    :活跃的主分片数
  • active_shards
    :活跃的总分片数
  • unassigned_shards
    :未分配的分片数量

2. 节点性能监控

使用 _nodes/stats
API 获取各节点的性能指标:

GET _nodes/stats

重点关注:

  • indices.docs.count
    :文档数量
  • indices.store.size_in_bytes
    :索引大小
  • jvm.mem.heap_used_percent
    :JVM 堆内存使用率
  • os.cpu.percent
    :CPU 使用率
  • fs.total.available_in_bytes
    :磁盘可用空间

3. 分片状态监控

使用 _cat/shards
API 查看所有分片的详细信息:

GET _cat/shards?v&h=index,shard,prirep,state,unassigned.reason

重点检查 unassigned.reason
字段,确保没有未分配的分片。如果发现未分配的分片,可以使用以下命令重新分配:

POST /_cluster/reroute
{
  "commands": [
    {
      "allocate_stale_primary": {
        "index""your-index",
        "shard"0,
        "node""node-name",
        "accept_data_loss"true
      }
    }
  ]
}

4. 索引状态巡检

定期检查索引的健康状态,特别是大型或活跃的索引:

GET _cat/indices?v&h=index,health,status,pri,rep,docs.count,store.size


查看以下字段:

  • health
    :索引健康状态
  • status
    :索引状态
  • pri
    :主分片数量
  • rep
    :副本分片数量
  • docs.count
    :文档数量
  • store.size
    :索引大小

5. 集群性能分析(使用 Profile 查询)

为了深入分析查询性能,可以在查询时使用 profile
参数,返回每个查询阶段的执行时间,帮助定位性能瓶颈:

GET /your-index/_search?pretty
{
  "profile"true,
  "query": {
    "match": {
      "field""value"
    }
  }
}

返回结果中包含每个查询的执行时间,识别最耗时的部分(如分片级别操作)。

kibana 的 Search Profile 工具要用好!


二、Filebeat 巡检方案

1. Filebeat 配置检查

确保 Filebeat 正确安装并运行:

systemctl status filebeat

检查配置文件 /etc/filebeat/filebeat.yml
,确保以下关键部分:

  • 输入源(如日志文件、系统日志)配置正确
  • 输出目标(Elasticsearch 或 Logstash)配置正确

2. Filebeat 日志检查

查看 Filebeat 日志文件(通常位于 /var/log/filebeat/filebeat
)以确保没有错误信息:

tail -f /var/log/filebeat/filebeat

常见错误:

  • 无法连接到 Elasticsearch 或 Logstash
  • 由于权限问题无法读取日志文件

3. Filebeat 配置测试

测试 Filebeat 配置文件的正确性:

filebeat test config

使用 -e
参数启动 Filebeat,输出调试日志:

filebeat -e


三、Logstash 巡检方案

1. Logstash 进程检查

检查 Logstash 是否正常运行:

systemctl status logstash

2. Logstash 管道配置检查

检查 Logstash 的管道配置,通常位于 /etc/logstash/conf.d/
下:

cat /etc/logstash/conf.d/*.conf

确保以下部分配置正确:

  • 输入(如 Filebeat、Kafka)
  • 过滤(如 Grok 解析、日期处理)
  • 输出(如 Elasticsearch、文件)

3. Logstash 日志检查

查看 Logstash 的日志文件(通常位于 /var/log/logstash/
):

tail -f /var/log/logstash/logstash-plain.log

重点检查:

  • 连接失败(如无法连接到 Elasticsearch)
  • 解析错误(如 Grok 模式不匹配)

四、Kibana 巡检方案

1. Kibana 进程状态检查

检查 Kibana 服务是否正常运行:

systemctl status kibana

2. Kibana 配置检查

检查 Kibana 配置文件(通常位于 /config/kibana.yml
):

cat /config/kibana.yml

确保以下配置正确:

  • elasticsearch.hosts
    : ["http://localhost:9200"]
    (或集群地址)
  • server.host
    : 0.0.0.0
    (确保 Kibana 可被外部访问)

3. Kibana 日志检查

查看 Kibana 的日志文件(通常位于 /logs/kibana.log
):

tail -f logs/kibana.log

重点检查:

  • 无法连接 Elasticsearch
  • Kibana 启动失败

4. Kibana UI 巡检

登录 Kibana 界面,检查以下功能是否正常:

  • Discover:能否正常搜索和查看日志数据
  • Dashboards:仪表板是否正常显示
  • Visualizations:可视化图表是否加载正常

五、DSL 查询示例

为了更深入的巡检,可以使用一些常见的 DSL 查询来检查系统的日志和性能。

1. 查询慢查询日志

查找慢查询,定位性能瓶颈:

GET /_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte""now-1d/d",
        "lt""now/d"
      }
    }
  },
  "sort": [
    {
      "took": {
        "order""desc"
      }
    }
  ],
  "size"10
}

Elasticsearch高级调优方法论之——根治慢查询!为什么Elasticsearch查询变得这么慢了?
2. 查询错误日志

如果怀疑系统有错误,可以查询错误日志:

Elasticsearch 日志能否把全部请求打印出来?

GET /filebeat-*/_search
{
  "query": {
    "match": {
      "message""error"
    }
  }
}

3. 查询特定节点性能问题

根据节点的 ID 或名称查询该节点上的所有日志:

GET /_search
{
  "query": {
    "term": {
      "host.name": {
        "value""node-1"
      }
    }
  }
}


六、企业级参考:自动化监控 Elasticsearch 集群

为了持续监控 Elasticsearch 集群的健康状态、CPU、内存、负载、磁盘使用率等指标,可以使用 Python 脚本结合定时任务实现自动化采集。我早期项目是借助 shell 脚本实现,原理一致

1. Elasticsearch 指标采集 Python 脚本

1.1 准备工作

确保安装了 requests
库:

pip install requests

1.2 脚本代码(可运行版本)
import json
from datetime import datetime
import configparser
import warnings

from elasticsearch import Elasticsearch

warnings.filterwarnings("ignore")

# 初始化 Elasticsearch 客户端
def init_es_client(config_path='./conf/config.ini'):
    """初始化并返回Elasticsearch客户端"""
    config = configparser.ConfigParser()
    config.read(config_path)
    es_host = config.get('elasticsearch''ES_HOST')
    es_user = config.get('elasticsearch''ES_USER')
    es_password = config.get('elasticsearch''ES_PASSWORD')

    es = Elasticsearch(
        hosts=[es_host],
        basic_auth=(es_user, es_password),
        verify_certs=False,
        ca_certs='conf/http_ca.crt'
    )
    return es

# 日志文件配置
LOG_FILE = 'elasticsearch_metrics.log'

# 使用初始化的客户端
es = init_es_client()

# 获取集群健康状况
def get_cluster_health():
    return es.cluster.health().body  # 获取字典格式的数据

# 获取节点统计信息
def get_node_stats():
    return es.nodes.stats().body  # 获取字典格式的数据

# 获取集群指标
def get_cluster_metrics():
    metrics = {}
    cluster_health = get_cluster_health()
    metrics['cluster_health'] = cluster_health

    node_stats = get_node_stats()
    nodes = node_stats.get('nodes', {})

    metrics['nodes'] = {}
    for node_id, node_info in nodes.items():
        node_name = node_info.get('name')
        metrics['nodes'][node_name] = {
            'cpu_usage': node_info['os']['cpu']['percent'],
            'load_average': node_info['os']['cpu'].get('load_average', {}).get('1m'),
            'memory_used': node_info['os']['mem']['used_percent'],
            'heap_used': node_info['jvm']['mem']['heap_used_percent'],
            'disk_available': node_info['fs']['total']['available_in_bytes'] / (1024 ** 3),
            'disk_total': node_info['fs']['total']['total_in_bytes'] / (1024 ** 3),
            'disk_usage_percent': 100 - (
                node_info['fs']['total']['available_in_bytes'] * 100 / node_info['fs']['total']['total_in_bytes']
            )
        }

    return metrics

# 记录集群状态到日志文件
def log_metrics():
    metrics = get_cluster_metrics()
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    with open(LOG_FILE, 'a') as f:
        f.write(f"Timestamp: {timestamp}\n")
        f.write(json.dumps(metrics, indent=4))
        f.write('\n\n')

# 主函数
if __name__ == "__main__":
    log_metrics()
    print("Elasticsearch cluster metrics logged successfully.")

1.3 关键指标说明

  • cluster_health
    :集群健康状态(green
    yellow
    red
  • cpu_usage
    :每个节点的 CPU 使用率
  • load_average
    :系统负载平均值
  • memory_used
    :每个节点的内存使用率
  • heap_used
    :JVM 堆内存使用率
  • disk_available
    :磁盘可用空间(GB)
  • disk_total
    :磁盘总空间(GB)
  • disk_usage_percent
    :磁盘使用率

日志将以 JSON 格式保存,记录每次获取到的集群和节点状态。(如下图所示)

2. 定时执行任务脚本

为了每天早上6点自动执行脚本,可以使用 cron
设置定时任务。

2.1 编辑定时任务

假设脚本位于 /home/user/scripts/es_metrics.py
,使用以下命令编辑 crontab

crontab -e

添加以下行:

0 6 * * * /usr/bin/python3 /home/user/scripts/es_metrics.py >> /home/user/scripts/es_metrics_cron.log 2>&1

解释:

  • 0 6 * * *
    :每天早上6点执行
  • /usr/bin/python3
    :Python 3 的完整路径,需根据实际情况调整
  • >> /home/user/scripts/es_metrics_cron.log 2>&1
    :将输出和错误信息重定向到日志文件

七、结论

通过以上深度巡检方案,能够全面掌握 EFLK 各组件的健康状态和性能指标。定期巡检和自动化监控有助于及时发现潜在问题,保障集群的稳定运行。

  • Elasticsearch:重点关注集群健康、节点性能、分片状态和索引状况
  • Filebeat、Logstash、Kibana:检查服务状态、配置文件和日志,确保数据采集和展示正常

持续的监控和优化,才能让我们的 EFLK 集群始终保持最佳状态。

当然,我们推荐优先借助监控指标工具 Prometheus、Zabbix、Grafana 等巡查。


希望这篇文章对大家有所帮助,欢迎交流讨论。


短时间快习得多干货!

和全球2000+ Elastic 爱好者一起精进!

elastic6.cn——ElasticStack进阶助手


抢先一步学习进阶干货

文章转载自铭毅天下Elasticsearch,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论