笔者最近看到一篇关于ClickHouse不错的文章,ClickHouse Data Warehouse 101: The First Billion Rows,带着注解并细说一下。该Topic中的一些内容,笔者之前已经介绍过,这里就不再详细说明,一带而过,感兴趣的可以查看slide。
ClickHouse概览
1. 特性
ClickHouse是一个强大的数据仓库,其可以应用到很多生产案例中。
SQL
支持bare meta到cloud
列式存储数据
并行和向量化执行
扩展至PB级别
Apache开源项目
速度极快
2. 表数据
ClickHouse的表为了查询性能,被切分为索引、排序并压缩的数据分片(数据part)。
3. scale out
单机无法满足性能时,ClickHouse非常容易扩容。
数据加载
1. ClickHouse基本的Schema设计
表格数据结构是最容易分析结果的。
CREATE TABLE tripdata (
`pickup_date` Date DEFAULT
toDate(tpep_pickup_datetime),
`id` UInt64,
`vendor_id` String,
`tpep_pickup_datetime` DateTime,
`tpep_dropoff_datetime` DateTime,
...
) ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY (pickup_location_id, dropoff_location_id, vendor_id)
该表包含了基于时间的分区key,以及排序key(数据分片的索引)。
2. clickhouse-client加载数据
CSV格式的输入数据:
"Pickup_date","id","vendor_id","tpep_pickup_datetime"...
"2016-01-02",0,"1","2016-01-02 04:03:29","2016-01-02...
"2016-01-29",0,"1","2016-01-29 12:00:51","2016-01-29...
"2016-01-09",0,"1","2016-01-09 17:22:05","2016-01-09...
ClickHouse读取包含header的CSV数据:
clickhouse-client --database=nyc_taxi_rides --query='INSERT
INTO tripdata FORMAT CSVWithNames' < data.csv
ClickHouse读取包含header并使用Gzip压缩的CSV数据:
gzip -d -c | clickhouse-client --database=nyc_taxi_rides
--query='INSERT INTO tripdata FORMAT CSVWithNames'
3. Altinity数据集
Altinity数据集可以用于并行查询海量数据文件。
将现有的schema定义和数据转储到文件中
将文件加载回数据库
数据转储/加载命令并行运行
具体请访问:https://github.com/Altinity/altinity-datasets
4. 加载13亿行数据
$ time ad-cli dataset load nyc_taxi_rides --repo_path=/data1/sample-data Creating database if it does not exist: nyc_timed
Executing DDL: /data1/sample-data/nyc_taxi_rides/ddl/taxi_zones.sql
.. .
Loading data: table=tripdata, file=data-200901.csv.gz .. .
Operation summary: succeeded=193, failed=0
real 11m4.827s
user 63m32.854s
sys 2m41.235s
机器配置:Xeon(R) Platinum 8175M, 8vCPU, 30GB RAM, NVMe SSD
数据量:
:) select count() from tripdata;
SELECT count()
FROM tripdata
┌────count()─┐
│ 1310903963 │
└────────────┘
1 rows in set. Elapsed: 0.324 sec. Processed 1.31 billion rows, 1.31 GB (4.05
billion rows/s., 4.05 GB/s.)
可以评估出上面每秒加载数据量:1,310,903,963/11m4s = 1,974,253 rows/sec
查询
预测最大性能
system.numbers为ClickHouse内部表,用于测试的内部生成器。
SELECT avg(number)
FROM
(
SELECT number
FROM system.numbers
LIMIT 1310903963
)
┌─avg(number)─┐
│ 655451981 │
└─────────────┘
1 rows in set. Elapsed: 3.420 sec. Processed 1.31 billion rows, 10.49 GB (383.29 million rows/s., 3.07 GB/s.)
查询导入的数据
SELECT avg(passenger_count)
FROM tripdata
┌─avg(passenger_count)─┐
│ 1.6817462943317076 │
└──────────────────────┘
如果在查询之前,让你猜一下的话,你的答案估计和上面差不多。但是实际上的结果耗时更少。
1 rows in set. Elapsed: 1.084 sec. Processed 1.31 billion rows, 1.31 GB (1.21 billion rows/s., 1.21 GB/s.)
这里查询耗时和数据类型以及基数有关系,而且很重要。
添加一个过滤条件
SELECT avg(passenger_count)
FROM tripdata
WHERE toYear(pickup_date) = 2016
┌─avg(passenger_count)─┐
│ 1.6571129913837774 │
└──────────────────────┘
1 rows in set. Elapsed: 0.162 sec. Processed 131.17 million rows, 393.50 MB (811.05 million rows/s., 2.43 GB/s.)
试一下group by
SELECT
pickup_location_id AS location_id,
avg(passenger_count),
count()
FROM tripdata
WHERE toYear(pickup_date) = 2016
GROUP BY location_id LIMIT 10
...
10 rows in set. Elapsed: 0.251 sec. Processed 131.17 million rows, 655.83 MB (522.62 million rows/s., 2.61 GB/s.)
查询常用的join
SELECT
zone,
avg(passenger_count),
count()
FROM tripdata
INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id
WHERE toYear(pickup_date) = 2016
GROUP BY zone
LIMIT 10
10 rows in set. Elapsed: 0.803 sec. Processed 131.17 million rows, 655.83 MB (163.29 million rows/s., 816.44 MB/s.)
优化技术
上面我们可以看出,ClickHouse已经很快了,那如何使ClickHouse更快呢?接下来笔者将介绍ClickHouse中使用的一些优化技术,大体包括如下几个方面:
服务端配置参数
Schema
列式存储
查询
查询处理线程数
SELECT avg(passenger_count)
FROM tripdata
SETTINGS max_threads = 1
...
1 rows in set. Elapsed: 4.855 sec. Processed 1.31 billion rows, 1.31 GB (270.04 million rows/s., 270.04 MB/s.)
SELECT avg(passenger_count)
FROM tripdata
SETTINGS max_threads = 8
...
1 rows in set. Elapsed: 1.092 sec. Processed 1.31 billion rows, 1.31 GB (1.20 billion rows/s., 1.20 GB/s.)
max_threads表示的是查询处理线程的最大数量,建议为可用cores的一半。
Schema优化
主要包括如下几点:
数据类型
索引
字典
数组
物化视图和聚合引擎
接下来具体介绍。
>> 数据类型
详情查阅:
https://www.percona.com/blog/2019/02/15/clickhouse-performance-uint32-vs-uint64-vs-float32-vs-float64/
>> 使用SummingMergeTree引擎实现物化视图
CREATE MATERIALIZED VIEW tripdata_mv
ENGINE = SummingMergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY (pickup_location_id, dropoff_location_id, vendor_id) AS
SELECT
pickup_date,
vendor_id,
pickup_location_id,
dropoff_location_id,
sum(passenger_count) AS passenger_count_sum,
sum(trip_distance) AS trip_distance_sum,
sum(fare_amount) AS fare_amount_sum,
sum(tip_amount) AS tip_amount_sum,
sum(tolls_amount) AS tolls_amount_sum,
sum(total_amount) AS total_amount_sum,
count() AS trips_count
FROM tripdata
GROUP BY
pickup_date,
vendor_id,
pickup_location_id,
dropoff_location_id
MaterializedView是一个INSERT的触发器。
SummingMergeTree自动在后台聚合数据
接着我们插入数据:
INSERT INTO tripdata_mv SELECT
pickup_date,
vendor_id,
pickup_location_id,
dropoff_location_id,
passenger_count,
trip_distance,
fare_amount,
tip_amount,
tolls_amount,
total_amount,
1
FROM tripdata;
Ok.
0 rows in set. Elapsed: 303.664 sec. Processed 1.31 billion rows, 50.57 GB (4.32 million rows/s., 166.54 MB/s.)
查询物化视图:
SELECT count()
FROM tripdata_mv
┌──count()─┐
│ 20742525 │
└──────────┘
1 rows in set. Elapsed: 0.015 sec. Processed 20.74 million rows, 41.49 MB (1.39 billion rows/s., 2.78 GB/s.)
SELECT zone,
sum(passenger_count_sum)/sum(trips_count),
sum(trips_count)
FROM tripdata_mv
INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id
WHERE toYear(pickup_date) = 2016
GROUP BY zone
LIMIT 10
10 rows in set. Elapsed: 0.036 sec. Processed 3.23 million rows, 64.57 MB (89.14 million rows/s., 1.78 GB/s.)
可以看到物化视图查询速度很快。
最后看一下物化视图的实时聚合操作:
列式存储优化
压缩
LowCardinality数据类型
列编码
>> LowCardinality示例
我们来看一下针对10亿行数据使用LowCardinality数据类型的例子。
:) create table test_lc (
a String, a_lc LowCardinality(String) DEFAULT a) Engine = MergeTree
PARTITION BY tuple() ORDER BY tuple();
:) INSERT INTO test_lc (a) SELECT concat('openconfig-interfaces:interfaces/interface/subinterfaces/subinterface/state/index', toString(rand() % 1000))
FROM system.numbers LIMIT 1000000000;
┌─table───┬─name─┬─type───────────────────┬─compressed─┬─uncompressed─┐
│ test_lc │ a │ String │ 4663631515 │ 84889975226 │
│ test_lc │ a_lc │ LowCardinality(String) │ 2010472937 │ 2002717299 │
└─────────┴──────┴────────────────────────┴────────────┴──────────────┘
LowCardinality用字典编码对列进行编码
可以明显对比出不同数据类型的压缩和未压缩差别,LowCardinality数据类型减少了数据存储。
下面我们对比一下查询的性能:
:) select a a, count(*) from test_lc group by a order by count(*) desc limit 10;
......
10 rows in set. Elapsed: 11.627 sec. Processed 1.00 billion rows, 92.89 GB (86.00 million rows/s., 7.99 GB/s.)
:) select a_lc a, count(*) from test_lc group by a order by count(*) desc limit 10;
...
10 rows in set. Elapsed: 1.569 sec. Processed 1.00 billion rows, 3.42 GB (637.50 million rows/s., 2.18 GB/s.)
a_lc字段的查询性能更好,速度更快。
>> Array示例 我们来看一下针对10亿行数据使用Array数据类型的例子
创建表:
create table test_array (
s String,
a Array(LowCardinality(String)) default arrayDistinct(splitByChar(',', s))
) Engine = MergeTree PARTITION BY tuple() ORDER BY tuple();
插入数据:
INSERT INTO test_array (s)
WITH ['Percona', 'Live', 'Altinity', 'ClickHouse', 'MySQL', 'Oracle', 'Austin', 'Texas',
'PostgreSQL', 'MongoDB'] AS keywords
SELECT concat(keywords[((rand(1) % 10) + 1)], ',',
keywords[((rand(2) % 10) + 1)], ',',
keywords[((rand(3) % 10) + 1)], ',',
keywords[((rand(4) % 10) + 1)])
FROM system.numbers LIMIT 1000000000;
数据示例:
数据存储:
查询:
:) select count() from test_array where s like '%ClickHouse%';
┌───count()─┐
│ 343877409 │
└───────────┘
1 rows in set. Elapsed: 7.363 sec. Processed 1.00 billion rows, 39.20 GB (135.81 million rows/s., 5.32 GB/s.)
:) select count() from test_array where has(a,'ClickHouse');
┌───count()─┐
│ 343877409 │
└───────────┘
1 rows in set. Elapsed: 8.428 sec. Processed 1.00 billion rows, 11.44 GB (118.66 million rows/s., 1.36 GB/s.)
上面查询可以看出,like是非常有效的,但是我们减少了大量的I/O。
查询优化示例 - Join优化
请看下面的查询语句:
SELECT zone,
avg(passenger_count),
count()
FROM tripdata
INNER JOIN taxi_zones ON taxi_zones.location_id =
pickup_location_id
WHERE toYear(pickup_date) = 2016
GROUP BY zone
LIMIT 10
10 rows in set. Elapsed: 0.803 sec. Processed 131.17 million rows, 655.83 MB (163.29 million rows/s., 816.44 MB/s.)
经常写SQL的朋友可以发现上面的问题,优化如下:
SELECT zone,
sum(pc_sum) / sum(pc_cnt) AS pc_avg,
sum(pc_cnt)
FROM
(
SELECT
pickup_location_id,
sum(passenger_count) AS pc_sum,
count() AS pc_cnt
FROM tripdata
WHERE toYear(pickup_date) = 2016
GROUP BY pickup_location_id
)
INNER JOIN taxi_zones ON taxi_zones.location_id = pickup_location_id
GROUP BY zone LIMIT 10
10 rows in set. Elapsed: 0.248 sec. Processed 131.17 million rows, 655.83 MB (529.19 million rows/s., 2.65 GB/s.)
优化后的SQL,通过子查询减少了并行扫描的数据,并且拿GROUP BY后的结果数据进行join,减少了参与join的数据量。
ClickHouse集成
ClickHouse的生态工具丰富
客户端库:JDBC、ODBC、Python、Golang、...
支持使用Kafka表引擎去消费Kafka数据
可视化工具:Grafana、Tableau、Tabix、Superset
数据科学栈集成:Pandas、Jupyter Notebooks
ClickHouse Operator 创建,配置以及管理运行在Kubernete上的ClickHouse集群
与MySQL集成
MySQL External Dictionaries(从MySQL拉数据导入ClickHouse)
MySQL表引擎以及表函数(查询/插入)
MySQL Binary Log复制
ProxySQL支持ClickHouse
ClickHouse支持MySQL wire protocol
与PostgreSQL集成
ODBC External Dictionaries(从PostgreSQL拉数据导入ClickHouse)
ODBC表引擎以及表函数(查询/插入)
逻辑复制(Logical Replication)
Logical Replication使用publish/subcribe概念,在上游节点创建发布者,下游节点创建订阅者。https://github.com/mkabilov/pg2ch
Foreign Data Wrapper(FDW)
FDW是SQL标准SQL/MED(SQL Management of External Data)开发的 Postgres实现。FDW提供了一系列统一的公共接口,使得扩展程序可以轻松地在优化、执行、扫描、更新和统计等核心部分和Postgres深度集成,从而可以用SQL语句直接查询和操作外部数据源。例如FDW for MySQL,用户可以像操作本地表一样地直接查询,排序、分组、过滤、Join甚至插入和更新MySQL的数据。除了自己动手实现,社区已经存在了很多的FDW扩展。
Percona-Lab的实现方案:https://github.com/Percona-Lab/clickhousedb_fdw
ClickHouse Operator
感兴趣的朋友关注一下github开源项目:https://github.com/Altinity/clickhouse-operator
后面笔者抽时间单独介绍ClickHouse Operator以及实战。
好了,今天关于ClickHouse的分享到此结束,感兴趣的朋友继续关注笔者后续内容。




