暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一个Logstash采集MySQL慢日志的配置示例

80

上篇文章介绍了,利用ELK实现MySQL审计日志可视化,其核心在于logstash的pipeline的配置,本篇将介绍如何编写采集MySQL慢日志的pipeline配置文件。相对于审计日志来说,慢日志的配置要更复杂一些,因为审计日志的每一行记录都是json格式,而慢日志文件中一条慢日志记录会跨越多行,需要把多行合并成一个事件,这时候codec需要用到grok。

mysql_slowlog.conf


input {
  file {
    path => "/data/mysql/log/slow.log"   # 慢日志路径
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb_mysql_slowlog"
    codec => multiline {
      pattern => "^# Time:"               # 遇到 # Time 开始新事件
      negate => true
      what => "previous"
    }
  }
}

filter {
# 提取时间
  grok {
    match => { "message" => "# Time: %{TIMESTAMP_ISO8601:log_time}" }
    add_tag => ["mysql_slow"]
  }

# 提取用户和 host 信息
  grok {
    match => { "message" => "# User@Host: %{DATA:user}\[%{DATA:username}\] @ *\[?(%{IP:host})?\]? *Id:\s*%{NUMBER:thread_id}" }
  }

# 提取执行信息
  grok {
    match => { "message" => "# Query_time:\s*%{NUMBER:query_time:float}\s*Lock_time:\s*%{NUMBER:lock_time:float}\s*Rows_sent:\s*%{NUMBER:rows_sent:int}\s*Rows_examined:\s*%{NUMBER:rows_examined:int}" }
  }

# 去掉注释和 SET timestamp,保留 SQL
  mutate {
    gsub => [
      "message""^#.*\n""",                  # 去掉 # 开头的注释
      "message""^SET timestamp=.*;\n"""     # 去掉 SET timestamp
    ]
  }

  mutate {
    rename => { "message" => "sql" }
  }

# 时间字段映射为 @timestamp
  date {
    match => ["log_time""ISO8601"]
    target => "@timestamp"
  }
}


output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["http://xxx:9200"]
    user => "elastic"
    password => "xxx"
    index => "mysql_slowlog-%{+YYYY.MM.dd}"
    ilm_enabled => false
  }
}


MySQL慢日志的格式如下所示:


# Time: 2025-09-12T07:32:37.543801Z
# User@Host: root[root] @  [10.xxx.xxx.xxx]  Id:    94
# Query_time: 4.000326  Lock_time: 0.000000 Rows_sent: 1  Rows_examined: 1
SET timestamp=1757662353;
select sleep(4);

以下内容是对配置内容的解析。

Input部分

input {
  file {
    path => "/data/mysql/log/slow.log"   # 慢日志路径
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb_mysql_slowlog"
    codec => multiline {
      pattern => "^# Time:"               # 遇到 # Time 开始新事件
      negate => true
      what => "previous"
    }
  } 
}

  • file{...}: 告诉Logstash从文件读取数据,这里是Path配置的路径:/data/mysql/log/slow.log
  • start_position => "beginning": 第一次启动,从文件开头读取。
  • sincedb_path: 记录审计文件的读取进度,防止重复发送。
  • codec => multiline { ... }: 指定多行合并逻辑:从# Time:
    开始,到下一个# Time:
    开始前的所有行被合并为一个事件。
    • pattern => "^# Time:":匹配以开头# Time: 
      的行。
    • negate => true:表示不匹配正则表达式的行属于前一行内容的一部分。如果一行不是以# Time:
      开头,那它就属于上一行的内容。
    • what => "previous":表示匹配到的行应该拼接到 上一行

Filter部分

filter {
  # 提取时间
  grok {
    match => { "message" => "# Time: %{TIMESTAMP_ISO8601:log_time}" }
    add_tag => ["mysql_slow"]
  }

# 提取用户和 host 信息
  grok {
    match => { "message" => "# User@Host: %{DATA:user}\[%{DATA:username}\] @ *\[?(%{IP:host})?\]? *Id:\s*%{NUMBER:thread_id}" }
  }

# 提取执行信息
  grok {
    match => { "message" => "# Query_time:\s*%{NUMBER:query_time:float}\s*Lock_time:\s*%{NUMBER:lock_time:float}\s*Rows_sent:\s*%{NUMBER:rows_sent:int}\s*Rows_examined:\s*%{NUMBER:rows_examined:int}" }
  }

# 去掉注释和 SET timestamp,保留 SQL
  mutate {
    gsub => [
      "message""^#.*\n""",                  # 去掉 # 开头的注释
      "message""^SET timestamp=.*;\n"""     # 去掉 SET timestamp
    ]
  }

  mutate {
    rename => { "message" => "sql" }
  }

# 时间字段映射为 @timestamp
  date {
    match => ["log_time""ISO8601"]
    target => "@timestamp"
  }
}


1.第一条grok:提取时间

grok {
    match => { "message" => "# Time: %{TIMESTAMP_ISO8601:log_time}" }
    add_tag => ["mysql_slow"]
  }

  • match:匹配# Time: 后面的ISO8601时间,提取数据到log_time字段。
  • add_tag:给事件加上标签mysql_slow,也就是在对象中加一个tags字段。

2.第二条grok:提取用户和 host 信息

  # 提取用户和 host 信息
  grok {
    match => { "message" => "# User@Host: %{DATA:user}\[%{DATA:username}\] @ *\[?(%{IP:host})?\]? *Id:\s*%{NUMBER:thread_id}" }
  }

  • match: 正则匹配以# User@Host
    开头的整体结构,从中提取user、username、host、thread_id。
  • 日志行类似:# User@Host: root[root] @  [10.xxx.xxx.xxx]  Id:    94

3.第三条grok:提取执行信息

  # 提取执行信息
  grok {
    match => { "message" => "# Query_time:\s*%{NUMBER:query_time:float}\s*Lock_time:\s*%{NUMBER:lock_time:float}\s*Rows_sent:\s*%{NUMBER:rows_sent:int}\s*Rows_examined:\s*%{NUMBER:rows_examined:int}" }
  }

  • match: - 匹配 # Query_time: ... Lock_time: ... Rows_sent: ... Rows_examined: ...
    ,从中提取query_time、lock_time、rows_sent、rows_examined。
  • 日志行类似:# Query_time: 4.000326  Lock_time: 0.000000 Rows_sent: 1  Rows_examined: 1 SET timestamp=1757662353;

4.mutate+gsub:去除注释和SET timestamp


 # 去掉注释和 SET timestamp,保留 SQL
  mutate {
    gsub => [
      "message""^#.*\n""",                  # 去掉 # 开头的注释
      "message""^SET timestamp=.*;\n"""     # 去掉 SET timestamp
    ]
  }

5. mutate rename:重命名字段

  mutate {
    rename => { "message" => "sql" }
  }

  • rename: message
    字段改名为sql

6. date:把log_time映射到@timestamp


 # 时间字段映射为 @timestamp
  date {
    match => ["log_time""ISO8601"]
    target => "@timestamp"
  }

  • match: 匹配log_time字段
  • target: 将它映射到@timestamp字段。在Logstash中,每个事件都有一个默认的事件字段:@stimestamp字段。如果不处理,Logstash存的@timestamp就是”日志到达Logstash的时间“ ,而不是SQL执行时间。

Output部分

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["http://xxx:9200"]
    user => "elastic"
    password => "xxx"
    index => "mysql_slowlog-%{+YYYY.MM.dd}"
    ilm_enabled => false
  }
}

  • stdout { codec => rubydebug }
    • 在终端输出每条事件的完整内容,方便调试
  • elasticsearch { ... }
    • 输出到 Elasticsearch
    • hosts
       → ES 地址
    • user
       / password
       → 认证
    • index
       → 索引名,按日期创建,例如 mysql_slowlog-2025.09.12
    • ilm_enabled => false
       → 不启用 Elasticsearch 的 Index Lifecycle Management(ILM)

多pipeline

如笔者在开篇提到的,我们之前已经添加过采集MySQL审计日志的pipeline,现在要再添加采集MySQL慢日志的pipeline,我们就需要使用多pipeline配置,否则可能出现mysql_slowlog中写入的是audit.log的内容这样错乱的日志。 在/etc/logstash/pipelines :

- pipeline.id: mysql_slow
  path.config: "/etc/logstash/conf.d/mysql_slow.conf"

- pipeline.id: mysql_audit
  path.config: "/etc/logstash/conf.d/mysql_audit.conf"



参考:
https://www.elastic.co/cn/support/matrix#matrix_compatibility https://www.elastic.co/guide/en/kibana/7.17/setup.html


推荐阅读:

利用ELK实现MySQL审计日志的可视化

 or  


👇👇👇 

文章转载自PostgreSQL运维技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论