暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DorisDB实时慢SQL实现

雷雷DBA 2021-08-16
1673

1、前言

DorisDB数据库,我们已经使用了有一段时间了,目前已经15套左右了。很多业务已经由初期测试到线上的深度使用了。

但伴随着业务量的增长,很多性能问题就暴露出来了,例如SQL查询效率如何?

因此,随着很多基础的自动化的完成,近期在考虑慢SQL方面的自动化建设。

慢SQL的采集、展示,用来方便DBA、开发来定期查看、优化。


2、实现方式

2.1、思考

如何实现慢SQL的采集、分析、展示呢?

日志格式:

fe/log/fe.audit.log

里面有2种日志:[query] 和 [slow_query]

慢SQL日志如下:

2021-08-15 09:05:49,223 [slow_query] |Client=10.1.1.2:42141|User=default_cluster:xxx|Db=default_cluster:xxx|State=EOF|Time=10|ScanBytes=1339|ScanRows=1|ReturnRows=1|StmtId=43542666|QueryId=edbdd4e3-fd64-11eb-b176-0ab2114db0b3|IsQuery=true|feIp=10.1.1.1|Stmt=SELECT 1

需求:

  • 慢SQL实时采集

  • 入库存储,方便分析,例如某时间段内的慢SQL数量、平均执行时间等等

  • 写入性能好,可扩展

解决:

  • 使用通用的日志采集工具,filebeat

  • DorisDB作为底层的存储,方便分析

  • kafka接入方式,快速、高效

  • SQL指纹暂时不好解决:

    • filebeat 采集时,使用正则处理?此种方式可能有一定性能损失,且SQL复杂,正则不好处理

    • 入kafka 后,自己消费kafka,再使用TiDB Parser 解析,再写入DorisDB?如何写入,flink?kafka ?流程有点长

    • 入库后,再使用其他方式分析?例如TiDB Parser ?这些待后面思考、测试

    • 官方支持?这个是最方便的方式,后面咨询官方是否支持吧

    • 每天级别的慢SQL分析,使用脚本分析,效率稍微差一些

综上,先实现DorisDB的实时采集吧,后面再细考虑下SQL指纹及汇总分析的实现


2.2、实现架构

【filebeat 过滤采集 】--> 【kakfa】 --> 【DorisDB】


3、具体实现

3.1、kafka 准备

【申请公司的kafka】:

topic: dorisdb_slow_log

client_id: dorisdb_slow_log-123

hosts: ["10.1.1.1:666","10.1.1.2:666","10.1.1.3:666","10.1.1.4:666","10.1.1.5:666"]


3.2、filebeat环境准备

下载filebeat,修改配置文件,开启filebeat采集


3.3、filebeat配置


此处的难点为:

  • 只采集slow_query

  • 切割列

【配置如下】:

path.home: opt/soft/filebeat/filebeat_999-1
path.config: opt/soft/filebeat/filebeat_999-1/conf
path.data: opt/soft/filebeat/filebeat_999-1/data
path.logs: opt/soft/filebeat/filebeat_999-1/log
#=============================================================================
filebeat.inputs:
- type: log
  enabled: true
  ignore_older: 5m
  include_lines: ['[slow_query]']
  paths:
    - opt/soft/dorisdb999/fe/log/fe.audit.log
  fields:
    log_topics: dorisdb_slow_log
processors:
  - script:
      lang: javascript
      id: my_filter
      tag: enable
      source: >
        function process(event) {
            var str= event.Get("message");
            var slow_time =str.substr(0, 19);
            var detail_query = str.substr(38);
            var js_arr = detail_query.split("|");
            var Client_tmp = js_arr[0];
            var Client=Client_tmp.replace('Client=','');
            var User_tmp = js_arr[1];
            var User=User_tmp.replace('User=','');
            var Db_tmp =js_arr[2];
            var Db = Db_tmp.replace('Db=','');
            var State_tmp = js_arr[3];
            var State = State_tmp.replace('State=','');
            var Time_tmp=js_arr[4];
            var Time = Time_tmp.replace('Time=','');
            var ScanBytes_tmp = js_arr[5];
            var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
            var ScanRows_tmp = js_arr[6];
            var ScanRows = ScanRows_tmp.replace('ScanRows=','');
            var ReturnRows_tmp = js_arr[7];
            var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
            var StmtId_tmp = js_arr[8];
            var StmtId = StmtId_tmp.replace('StmtId=','');
            var QueryId_tmp = js_arr[9];
            var QueryId = QueryId_tmp.replace('QueryId=','');
            var IsQuery_tmp = js_arr[10];
            var IsQuery = IsQuery_tmp.replace('IsQuery=','');
            var feIp_tmp = js_arr[11];
            var feIp = feIp_tmp.replace('feIp=','');
            var Stmt_tmp = js_arr[12];
            var Stmt = Stmt_tmp.replace('Stmt=','');
            var Stmt = Stmt.substring(0,65530)
            event.Put("slow_time",slow_time);
            event.Put("Client",Client);
            event.Put("User",User);
            event.Put("Db",Db);
            event.Put("State",State);
            event.Put("Time",Time);
            event.Put("ScanBytes",ScanBytes);
            event.Put("ScanRows",ScanRows);
            event.Put("ReturnRows",ReturnRows);
            event.Put("StmtId",StmtId);
            event.Put("QueryId",QueryId);
            event.Put("IsQuery",IsQuery);
            event.Put("feIp",feIp);
            event.Put("Stmt",Stmt);
            event.Put("igid",'999-1');
            event.Put("port",999);
            event.Put("dorisdb_fe_ip",'10.10.10.10');
        }
  - drop_fields:
      fields: ["ecs","agent","message","log","host"]

output.kafka:
  enabled: true
  hosts: ["10.1.1.1:666","10.1.1.2:666","10.1.1.3:666","10.1.1.4:666","10.1.1.5:666"]
  topic: '%{[fields][log_topics]}'
  worker: 1
  timeout: 30s
  broker_timeout: 10s
  keep_alive: 0
  compression: gzip
  required_acks: 1
  client_id: dorisdb_slow_log-123


3.4、开启filebeat

/usr/bin/nohup filebeat -c filebeat_999-1.yml -e >>  filebeat_999-1.log 2>&1 &


4、DorisDB环境

4.1、创建集群


【部署集群】:

dorisdb_manage cluster deploy 999-1 --db_name=dorisdb_slow --user_name=xxx --user_password=xxx --auth_ip=10.1.1.1 --domain_name=xxx 

注:此处为DBA自己实现的部署自动化


4.2、创建慢SQL表

CREATE TABLE `dorisdb_slow` (
  `slow_time` datetime  COMMENT "slow time",
  `igid` varchar(40)  COMMENT "",
  `db_name` varchar(300) COMMENT "db name",
  `fe_ip` varchar(30)  COMMENT "fe ip",
  `query_id` varchar(200)  COMMENT "QueryId",
  `time` bigint(20)  COMMENT "SQL run time",
  `client` varchar(30)  COMMENT "client ip",
  `user` varchar(200) COMMENT "user name",
  `state` varchar(200) COMMENT "State",
  `scan_bytes` bigint(20)  COMMENT "ScanBytes",
  `scan_rows` bigint(20) COMMENT "ScanRows",
  `return_rows` bigint(20) COMMENT "ReturnRows",
  `stmt_id` bigint(20)  COMMENT "StmtId",
  `is_query` varchar(50)  COMMENT "IsQuery",
  `stmt` varchar(65533)  COMMENT "Stmt,Query Detail"
) ENGINE=OLAP 
DUPLICATE KEY(`slow_time`, `igid`, `db_name`, `fe_ip`, `query_id`)
COMMENT "DorisDB慢SQL表"
PARTITION BY RANGE(`slow_time`)
(PARTITION p20210813 VALUES [('2021-08-13 00:00:00'), ('2021-08-14 00:00:00')),
PARTITION p20210814 VALUES [('2021-08-14 00:00:00'), ('2021-08-15 00:00:00')),
.......
PARTITION p20211230 VALUES [('2021-12-30 00:00:00'), ('2021-12-31 00:00:00')),
PARTITION p20211231 VALUES [('2021-12-31 00:00:00'), ('2022-01-01 00:00:00')))
DISTRIBUTED BY HASH(`igid`) BUCKETS 48 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);


4.3、创建kafka任务

CREATE ROUTINE LOAD dorisdb_slow_load_20210814 ON dorisdb_slow
columns (slow_time,igid,db_name,fe_ip,query_id,time,client,user,state,scan_bytes,scan_rows,return_rows,stmt_id,is_query,stmt)
PROPERTIES (
"format"="json",
"jsonpaths"="[\"$.slow_time\",\"$.igid\",\"$.Db\",\"$.feIp\",\"$.QueryId\",\"$.Time\",\"$.Client\",\"$.User\",\"$.State\",\"$.ScanBytes\",\"$.ScanRows\",\"$.ReturnRows\",\"$.StmtId\",\"$.IsQuery\",\"$.Stmt\"]",
"desired_concurrent_number"="8",
"max_error_number" = "9999999999",
"max_batch_rows"="200000",
"max_batch_size" = "104857600",
"strict_mode" = "false"
 )
FROM KAFKA
(
 "kafka_broker_list"= "10.1.1.1:666,10.1.1.2:666,10.1.1.3:666,10.1.1.4:666,10.1.1.5:666",
 "kafka_topic" = "dorisdb_slow_log",
 "property.kafka_default_offsets" = "OFFSET_END",
 "property.client.id" = "dorisdb_slow_log-123",
 "property.group.id" = "dorisdb_slow_load_20210814"
);

4.4、查看任务状态


4.5、查看慢SQL


文章转载自雷雷DBA,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论