暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ClickHouse Data Warehouse 101: The First Billion Rows

DataFlow范式 2019-10-14
404

笔者最近看到一篇关于ClickHouse不错的文章,ClickHouse Data Warehouse 101: The First Billion Rows,带着注解并细说一下。该Topic中的一些内容,笔者之前已经介绍过,这里就不再详细说明,一带而过,感兴趣的可以查看slide。

ClickHouse概览

1. 特性

ClickHouse是一个强大的数据仓库,其可以应用到很多生产案例中。

  • SQL

  • 支持bare meta到cloud

  • 列式存储数据

  • 并行和向量化执行

  • 扩展至PB级别

  • Apache开源项目

  • 速度极快

2. 表数据

ClickHouse的表为了查询性能,被切分为索引、排序并压缩的数据分片(数据part)。 

3. scale out

单机无法满足性能时,ClickHouse非常容易扩容。 

数据加载

1. ClickHouse基本的Schema设计


表格数据结构是最容易分析结果的。

  1. CREATE TABLE tripdata (

  2. `pickup_date` Date DEFAULT

  3. toDate(tpep_pickup_datetime),

  4. `id` UInt64,

  5. `vendor_id` String,

  6. `tpep_pickup_datetime` DateTime,

  7. `tpep_dropoff_datetime` DateTime,

  8. ...

  9. ) ENGINE = MergeTree

  10. PARTITION BY toYYYYMM(pickup_date)

  11. ORDER BY (pickup_location_id, dropoff_location_id, vendor_id)

该表包含了基于时间的分区key,以及排序key(数据分片的索引)。

2. clickhouse-client加载数据

CSV格式的输入数据:

  1. "Pickup_date","id","vendor_id","tpep_pickup_datetime"...

  2. "2016-01-02",0,"1","2016-01-02 04:03:29","2016-01-02...

  3. "2016-01-29",0,"1","2016-01-29 12:00:51","2016-01-29...

  4. "2016-01-09",0,"1","2016-01-09 17:22:05","2016-01-09...

ClickHouse读取包含header的CSV数据:

  1. clickhouse-client --database=nyc_taxi_rides --query='INSERT

  2. INTO tripdata FORMAT CSVWithNames' < data.csv

ClickHouse读取包含header并使用Gzip压缩的CSV数据:

  1. gzip -d -c | clickhouse-client --database=nyc_taxi_rides

  2. --query='INSERT INTO tripdata FORMAT CSVWithNames'

3. Altinity数据集

Altinity数据集可以用于并行查询海量数据文件。

  • 将现有的schema定义和数据转储到文件中

  • 将文件加载回数据库

  • 数据转储/加载命令并行运行

具体请访问:https://github.com/Altinity/altinity-datasets

4. 加载13亿行数据

  1. $ time ad-cli dataset load nyc_taxi_rides --repo_path=/data1/sample-data Creating database if it does not exist: nyc_timed

  2. Executing DDL: /data1/sample-data/nyc_taxi_rides/ddl/taxi_zones.sql

  3. .. .

  4. Loading data: table=tripdata, file=data-200901.csv.gz .. .

  5. Operation summary: succeeded=193, failed=0


  6. real 11m4.827s

  7. user 63m32.854s

  8. sys 2m41.235s

机器配置:Xeon(R) Platinum 8175M, 8vCPU, 30GB RAM, NVMe SSD

数据量:

  1. :) select count() from tripdata;

  2. SELECT count()

  3. FROM tripdata

  4. ┌────count()─┐

  5. 1310903963

  6. └────────────┘

  7. 1 rows in set. Elapsed: 0.324 sec. Processed 1.31 billion rows, 1.31 GB (4.05

  8. billion rows/s., 4.05 GB/s.)

可以评估出上面每秒加载数据量:1,310,903,963/11m4s = 1,974,253 rows/sec

查询

预测最大性能

system.numbers为ClickHouse内部表,用于测试的内部生成器。

  1. SELECT avg(number)

  2. FROM

  3. (

  4. SELECT number

  5. FROM system.numbers

  6. LIMIT 1310903963

  7. )

  8. ┌─avg(number)─┐

  9. 655451981

  10. └─────────────┘

  11. 1 rows in set. Elapsed: 3.420 sec. Processed 1.31 billion rows, 10.49 GB (383.29 million rows/s., 3.07 GB/s.)

查询导入的数据

  1. SELECT avg(passenger_count)

  2. FROM tripdata

  3. ┌─avg(passenger_count)─┐

  4. 1.6817462943317076

  5. └──────────────────────┘

如果在查询之前,让你猜一下的话,你的答案估计和上面差不多。但是实际上的结果耗时更少。

  1. 1 rows in set. Elapsed: 1.084 sec. Processed 1.31 billion rows, 1.31 GB (1.21 billion rows/s., 1.21 GB/s.)

这里查询耗时和数据类型以及基数有关系,而且很重要。

添加一个过滤条件

  1. SELECT avg(passenger_count)

  2. FROM tripdata

  3. WHERE toYear(pickup_date) = 2016

  4. ┌─avg(passenger_count)─┐

  5. 1.6571129913837774

  6. └──────────────────────┘

  7. 1 rows in set. Elapsed: 0.162 sec. Processed 131.17 million rows, 393.50 MB (811.05 million rows/s., 2.43 GB/s.)

试一下group by

  1. SELECT

  2. pickup_location_id AS location_id,

  3. avg(passenger_count),

  4. count()

  5. FROM tripdata

  6. WHERE toYear(pickup_date) = 2016

  7. GROUP BY location_id LIMIT 10

  8. ...

  9. 10 rows in set. Elapsed: 0.251 sec. Processed 131.17 million rows, 655.83 MB (522.62 million rows/s., 2.61 GB/s.)

查询常用的join

  1. SELECT

  2. zone,

  3. avg(passenger_count),

  4. count()

  5. FROM tripdata

  6. INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id

  7. WHERE toYear(pickup_date) = 2016

  8. GROUP BY zone

  9. LIMIT 10


  10. 10 rows in set. Elapsed: 0.803 sec. Processed 131.17 million rows, 655.83 MB (163.29 million rows/s., 816.44 MB/s.)

优化技术

上面我们可以看出,ClickHouse已经很快了,那如何使ClickHouse更快呢?接下来笔者将介绍ClickHouse中使用的一些优化技术,大体包括如下几个方面:

  • 服务端配置参数

  • Schema

  • 列式存储

  • 查询

查询处理线程数

  1. SELECT avg(passenger_count)

  2. FROM tripdata

  3. SETTINGS max_threads = 1

  4. ...


  5. 1 rows in set. Elapsed: 4.855 sec. Processed 1.31 billion rows, 1.31 GB (270.04 million rows/s., 270.04 MB/s.)


  6. SELECT avg(passenger_count)

  7. FROM tripdata

  8. SETTINGS max_threads = 8

  9. ...

  10. 1 rows in set. Elapsed: 1.092 sec. Processed 1.31 billion rows, 1.31 GB (1.20 billion rows/s., 1.20 GB/s.)

max_threads表示的是查询处理线程的最大数量,建议为可用cores的一半。

Schema优化

主要包括如下几点:

  • 数据类型

  • 索引

  • 字典

  • 数组

  • 物化视图和聚合引擎

接下来具体介绍。

>> 数据类型 

详情查阅: 

https://www.percona.com/blog/2019/02/15/clickhouse-performance-uint32-vs-uint64-vs-float32-vs-float64/

>> 使用SummingMergeTree引擎实现物化视图

  1. CREATE MATERIALIZED VIEW tripdata_mv

  2. ENGINE = SummingMergeTree

  3. PARTITION BY toYYYYMM(pickup_date)

  4. ORDER BY (pickup_location_id, dropoff_location_id, vendor_id) AS

  5. SELECT

  6. pickup_date,

  7. vendor_id,

  8. pickup_location_id,

  9. dropoff_location_id,

  10. sum(passenger_count) AS passenger_count_sum,

  11. sum(trip_distance) AS trip_distance_sum,

  12. sum(fare_amount) AS fare_amount_sum,

  13. sum(tip_amount) AS tip_amount_sum,

  14. sum(tolls_amount) AS tolls_amount_sum,

  15. sum(total_amount) AS total_amount_sum,

  16. count() AS trips_count

  17. FROM tripdata

  18. GROUP BY

  19. pickup_date,

  20. vendor_id,

  21. pickup_location_id,

  22. dropoff_location_id

  • MaterializedView是一个INSERT的触发器。

  • SummingMergeTree自动在后台聚合数据

接着我们插入数据:

  1. INSERT INTO tripdata_mv SELECT

  2. pickup_date,

  3. vendor_id,

  4. pickup_location_id,

  5. dropoff_location_id,

  6. passenger_count,

  7. trip_distance,

  8. fare_amount,

  9. tip_amount,

  10. tolls_amount,

  11. total_amount,

  12. 1

  13. FROM tripdata;


  14. Ok.


  15. 0 rows in set. Elapsed: 303.664 sec. Processed 1.31 billion rows, 50.57 GB (4.32 million rows/s., 166.54 MB/s.)

查询物化视图:

  1. SELECT count()

  2. FROM tripdata_mv

  3. ┌──count()─┐

  4. 20742525

  5. └──────────┘


  6. 1 rows in set. Elapsed: 0.015 sec. Processed 20.74 million rows, 41.49 MB (1.39 billion rows/s., 2.78 GB/s.)



  7. SELECT zone,

  8. sum(passenger_count_sum)/sum(trips_count),

  9. sum(trips_count)

  10. FROM tripdata_mv

  11. INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id

  12. WHERE toYear(pickup_date) = 2016

  13. GROUP BY zone

  14. LIMIT 10


  15. 10 rows in set. Elapsed: 0.036 sec. Processed 3.23 million rows, 64.57 MB (89.14 million rows/s., 1.78 GB/s.)

可以看到物化视图查询速度很快。

最后看一下物化视图的实时聚合操作: 

列式存储优化

  • 压缩

  • LowCardinality数据类型

  • 列编码

>> LowCardinality示例 

我们来看一下针对10亿行数据使用LowCardinality数据类型的例子。

  1. :) create table test_lc (

  2. a String, a_lc LowCardinality(String) DEFAULT a) Engine = MergeTree

  3. PARTITION BY tuple() ORDER BY tuple();


  4. :) INSERT INTO test_lc (a) SELECT concat('openconfig-interfaces:interfaces/interface/subinterfaces/subinterface/state/index', toString(rand() % 1000))

  5. FROM system.numbers LIMIT 1000000000;


  6. ┌─table───┬─name─┬─type───────────────────┬─compressed─┬─uncompressed─┐

  7. test_lc a String 4663631515 84889975226

  8. test_lc a_lc LowCardinality(String) 2010472937 2002717299

  9. └─────────┴──────┴────────────────────────┴────────────┴──────────────┘

LowCardinality用字典编码对列进行编码

可以明显对比出不同数据类型的压缩和未压缩差别,LowCardinality数据类型减少了数据存储。

下面我们对比一下查询的性能:

  1. :) select a a, count(*) from test_lc group by a order by count(*) desc limit 10;

  2. ......


  3. 10 rows in set. Elapsed: 11.627 sec. Processed 1.00 billion rows, 92.89 GB (86.00 million rows/s., 7.99 GB/s.)


  4. :) select a_lc a, count(*) from test_lc group by a order by count(*) desc limit 10;

  5. ...


  6. 10 rows in set. Elapsed: 1.569 sec. Processed 1.00 billion rows, 3.42 GB (637.50 million rows/s., 2.18 GB/s.)

a_lc字段的查询性能更好,速度更快。


>> Array示例 我们来看一下针对10亿行数据使用Array数据类型的例子

创建表:

  1. create table test_array (

  2. s String,

  3. a Array(LowCardinality(String)) default arrayDistinct(splitByChar(',', s))

  4. ) Engine = MergeTree PARTITION BY tuple() ORDER BY tuple();

插入数据:

  1. INSERT INTO test_array (s)

  2. WITH ['Percona', 'Live', 'Altinity', 'ClickHouse', 'MySQL', 'Oracle', 'Austin', 'Texas',

  3. 'PostgreSQL', 'MongoDB'] AS keywords

  4. SELECT concat(keywords[((rand(1) % 10) + 1)], ',',

  5. keywords[((rand(2) % 10) + 1)], ',',

  6. keywords[((rand(3) % 10) + 1)], ',',

  7. keywords[((rand(4) % 10) + 1)])

  8. FROM system.numbers LIMIT 1000000000;

数据示例: 

数据存储: 

查询:

  1. :) select count() from test_array where s like '%ClickHouse%';


  2. ┌───count()─┐

  3. 343877409

  4. └───────────┘


  5. 1 rows in set. Elapsed: 7.363 sec. Processed 1.00 billion rows, 39.20 GB (135.81 million rows/s., 5.32 GB/s.)


  6. :) select count() from test_array where has(a,'ClickHouse');

  7. ┌───count()─┐

  8. 343877409

  9. └───────────┘


  10. 1 rows in set. Elapsed: 8.428 sec. Processed 1.00 billion rows, 11.44 GB (118.66 million rows/s., 1.36 GB/s.)

上面查询可以看出,like是非常有效的,但是我们减少了大量的I/O。

查询优化示例 - Join优化

请看下面的查询语句:

  1. SELECT zone,

  2. avg(passenger_count),

  3. count()

  4. FROM tripdata

  5. INNER JOIN taxi_zones ON taxi_zones.location_id =

  6. pickup_location_id

  7. WHERE toYear(pickup_date) = 2016

  8. GROUP BY zone

  9. LIMIT 10


  10. 10 rows in set. Elapsed: 0.803 sec. Processed 131.17 million rows, 655.83 MB (163.29 million rows/s., 816.44 MB/s.)

经常写SQL的朋友可以发现上面的问题,优化如下:

  1. SELECT zone,

  2. sum(pc_sum) / sum(pc_cnt) AS pc_avg,

  3. sum(pc_cnt)

  4. FROM

  5. (

  6. SELECT

  7. pickup_location_id,

  8. sum(passenger_count) AS pc_sum,

  9. count() AS pc_cnt

  10. FROM tripdata

  11. WHERE toYear(pickup_date) = 2016

  12. GROUP BY pickup_location_id

  13. )

  14. INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id

  15. GROUP BY zone LIMIT 10


  16. 10 rows in set. Elapsed: 0.248 sec. Processed 131.17 million rows, 655.83 MB (529.19 million rows/s., 2.65 GB/s.)

优化后的SQL,通过子查询减少了并行扫描的数据,并且拿GROUP BY后的结果数据进行join,减少了参与join的数据量。

ClickHouse集成

ClickHouse的生态工具丰富

  • 客户端库:JDBC、ODBC、Python、Golang、...

  • 支持使用Kafka表引擎去消费Kafka数据

  • 可视化工具:Grafana、Tableau、Tabix、Superset

  • 数据科学栈集成:Pandas、Jupyter Notebooks

  • ClickHouse Operator 创建,配置以及管理运行在Kubernete上的ClickHouse集群

与MySQL集成

  • MySQL External Dictionaries(从MySQL拉数据导入ClickHouse)

  • MySQL表引擎以及表函数(查询/插入)

  • MySQL Binary Log复制

  • ProxySQL支持ClickHouse

  • ClickHouse支持MySQL wire protocol

与PostgreSQL集成

  • ODBC External Dictionaries(从PostgreSQL拉数据导入ClickHouse)

  • ODBC表引擎以及表函数(查询/插入)

  • 逻辑复制(Logical Replication)

Logical Replication使用publish/subcribe概念,在上游节点创建发布者,下游节点创建订阅者。https://github.com/mkabilov/pg2ch

  • Foreign Data Wrapper(FDW)

FDW是SQL标准SQL/MED(SQL Management of External Data)开发的 Postgres实现。FDW提供了一系列统一的公共接口,使得扩展程序可以轻松地在优化、执行、扫描、更新和统计等核心部分和Postgres深度集成,从而可以用SQL语句直接查询和操作外部数据源。例如FDW for MySQL,用户可以像操作本地表一样地直接查询,排序、分组、过滤、Join甚至插入和更新MySQL的数据。除了自己动手实现,社区已经存在了很多的FDW扩展。

Percona-Lab的实现方案:https://github.com/Percona-Lab/clickhousedb_fdw

ClickHouse Operator

感兴趣的朋友关注一下github开源项目:https://github.com/Altinity/clickhouse-operator

后面笔者抽时间单独介绍ClickHouse Operator以及实战。


好了,今天关于ClickHouse的分享到此结束,感兴趣的朋友继续关注笔者后续内容。

文章转载自DataFlow范式,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论