暂无图片
暂无图片
3
暂无图片
暂无图片
暂无图片

ClickHouse多种实时更新方法总结

ClickHouse研发笔记 2021-06-07
10283

ClickHouse本身对update的执行是低效的,因为ClickHouse的MergeTree存储一旦生成一个Data Part,这个Part就不支持更改,而是需要删除旧Part, 重写整个Part。所以从MergeTree存储内核层面,ClickHouse就不擅长做数据更新删除操作。

本文讲述的方法包括采用系统自带的Update,采用ReplacingMergeTree+Select+Final,采用补充字段+group by,采用AggregateFunction等不同的方法来达到实时的目的。

具体将从用法以及实时性,各自的优劣势进行对比分析。

方法一、UPDATE+Optimize

该方法适合的MergeTree家族的Engine。

UPDATE/DELETE
操作默认情况下是异步的,他们的更新触发时机只能发生在分区合并的时候 。所以更新之后立马查询还会看到旧数据。因此更新完就要立马执行optimize table xxx
,随后才执行`select查询即可看到最新的数据。

1.1 用法

Alter table xxx update col = xxx where xxx;
Optimize table xxx;

1.2 实时性

更新:非实时

查询:在Optimize
之后的查询是实时的

1.3 优缺点

更新需要2步操作才能生效。而且每次执行optimize table
会重写更改数据的partition。如果数据量很大就会非常耗时。对实时性要求较高的场景不适用。

方法二、UPDATE+ SETTING mutations_sync

该方法适合的MergeTree家族的Engine。

参数mutations_sync默认为0,表示异步。1表示等待当前server完成后返回,2表示等待所有副本数据都更新后再返回。

2.1 用法

Alter table xxx update col = xxx where xxx mutations_sync = 1/2

2.2 实时性

更新完之后所有查询都能立马感知到最新的数据。

2.3 优缺点

其原理和方法一中执行optimize table
是一样的,都是需要去重写where
条件成立的那些partition。UPDATE
操作本身会比较耗时。如果数据量大,执行起来就会很慢。

针对方法一和方法二的补充说明:

UPDATE col1=xxx where col3=xxx
操作在重写partition的时候只会去重写col1这个列,其他不受影响的列是通过hardlink直接映射到新partition里的。所以真正磁盘读的内容是col1和col3, 写的内容是col1.
DELETE where col3=xxx
如果where条件成立是会涉及到所有的column的,因此所有的列都是rewrite,没有hardlink。只有条件不成立的时候会整个partiiton hardlink过去,磁盘没有什么开销的。
Update
操作适合批量处理,不适合单行操作,因为每个update请求都会生成对应的mutation放到被处理队列中,个数受限,合并效率低。
可以优化的方法:多分区。由于更新操作只会涉及到where条件成立的那些partition, 因此如果更新是有规律的。比如按照月来更新,那么就可以对col3进行按月分区。这样每次更新就只会对应一个分区,更新的数据量极具下降,性能就成倍提升。(比如原先3年的数据,不分区的话每次更新都会重写所有数据,按月更新的话就只会涉及到1/36的数据。性能就提升36倍。)




后续的几种方法都是用insert来替代update。对应clickhouse而言,批量插入性能是非常好的,所以更新操作用insert替代会很快就返回。以下几种方法的区别在于:

1.insert需要插入更新未涉及到的列吗?2.查询的时候如何查才能保证得到的是最终数据。

方法三、INSERT+Final

该方法适用于xxxMergeTree,如ReplacingMergeTree, 不支持MergeTree。当 FINAL 被指定,ClickHouse会在返回结果之前完全合并数据,从而执行给定表引擎合并期间发生的所有数据转换。

3.1 用法

每次insert需要把所有列的数据补全.

CREATE TABLE table (....) ReplacingMergeTree(create_time) order by xxx;


INSERT INTO table (* EXCEPT(col1, col3)), col1, col3 SELECT * EXCEPT(col1,col3), col1_newV, col3 FROM table WHERE col3=xxx;


SELECT * FROM table final;

3.2 实时性

更新是异步的,查询的时候只有加上final才会得到最新的数据,如果不加final就还会看到旧数据,除非后台merge过了。

3.3 优缺点

查询时会比较慢,因为

1)需要对所有记录按照排序键来sort merge。

2) 除了读取查询的列还会读取主键列。

3)在读表数据的时候,有多少个partition就有多少个并发的线程分别去读取,如果分区个数只有2个,就只会启动2个线程并发去读,不会启动更多的线程。而不加final,如果系统有20 cores就会启动20个线程并发读。

方法四、INSERT+额外的标记位列

该方法适用于ReplacingMergeTree 通过新增两个列: deleted来表明是更新还是删除,create_time列来表明操作时间的先后顺序,比如有对同一记录的多次更新。

4.1 用法

建表:

CREATE TABLE test_a(
user_id UInt64,
score String,
deleted UInt8 DEFAULT 0,
create_time DateTime DEFAULT toDateTime(0)
)ENGINE= ReplacingMergeTree(create_time)
ORDER BY user_id

查询的时候,执行对相同key的聚合操作,通过argMax(xxx, create_time)
得到最后一次的更新或删除。Having
用来过滤掉被删除掉的数据。

SELECT
user_id ,
argMax(score, create_time) AS score,
argMax(deleted, create_time) AS deleted,
max(create_time) AS ctime
FROM test_a
GROUP BY user_id
HAVING deleted = 0

删除操作:只需要把对应记录的deleted=0
,加上当前的操作时间即可

单条删除:insert into test_a(user_id, deleted, create_time) values(id号,1, now());
批量删除:insert into test_a(user_id, deleted, create_time) select user_id , 1 now() from test_a where user_id in (…..);

更新操作:关键在于每次插入要填充所有的列。

insert into test_b (* except(score,create_time,delete), create_time, score) select * except(score,create_time,delete),now() as create_time,'aaa' as score from test_b where user_id in (…);



4.2 实时性

更新转为insert是实时返回了,性能很快。查询的时候通过聚合也可以看到最新的结果。

4.3 优缺点

所有查询会变慢,特别是当聚合列distinct值多了之后。本人的测试:在一个大宽表100列做测试,String user_id的不同值越4kw。单机内存128GB,跑上述聚合查询内存爆了。每个列都需要用argMax
来得到最新的数据,argMax
聚合函数的中间状态约需要72Byte.因此=聚合所需内存的粗略计算公式:4kw个不同的聚合key*(user_id String本身的大小+ 100列*72)=268GB。此处都没有考虑聚合时用到的哈希表的装载率问题。所以该方法不适用于多聚合列的大宽表。严重影响性能,甚至内存小的机器根本跑不出结果。

相关链接: ClickHouse准实时数据更新的新思路[1]

方法五、AggregatingMergeTree+物化视图

该思路主要是有一个基表存放原始数据以及后续更新转换为insert的数据。中间有一个物化视图当做触发器,每次有新数据到基表就将insert进来的数据进行聚合,存储到最上层的AggregatingMergeTree.所以最上层的的AggregatingMergeTree在没有merge之前是最原始的数据,每次批量更新转插入的数据的中间聚合状态。具体可看原链接:有限更新 Limited Update[2]

5.1 用法

基表的创建:

CREATE TABLE IF NOT EXISTS local_dat_update
(
pk1 UInt64 COMMENT '主键',
pk2 String COMMENT '主键',
ver DateTime64 COMMENT '版本,业务提供',
col1 Int32 DEFAULT 0 COMMENT '选取!=0的最大版本',
col2 String DEFAULT '' COMMENT '选取!=空字符串的最大版本',
col3 DateTime DEFAULT 0 COMMENT '选取!=0的最大版本'
) ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(kafka_time)
ORDER BY (kafka_topic, kafka_partition, kafka_offset);

上层聚合表的创建:

CREATE TABLE IF NOT EXISTS default.local_agg_update
(
pk1 UInt64,
pk2 String,
version AggregateFunction(max, DateTime64),
col1 AggregateFunction(argMaxIf, Int32, DateTime64, UInt8),
col2 AggregateFunction(argMaxIf, String, DateTime64, UInt8),
col3 AggregateFunction(argMaxIf, DateTime, DateTime64, UInt8)
) ENGINE = AggregatingMergeTree
PARTITION BY (xxHash64(pk1, pk2) % 10)
ORDER BY (pk1, pk2);

中间层物化视图作为触发器:

CREATE MATERIALIZED VIEW IF NOT EXISTS mv_update
TO local_agg_update
AS
SELECT pk1 AS pk1,
pk2 AS pk2,
maxState(ver) AS version,
argMaxIfState(col1, ver, col1 != 0) AS col1,
argMaxIfState(col2, ver, col2 != '') AS col2,
argMaxIfState(col3, ver, col3 != 0) AS col3
FROM default.local_dat_update
GROUP BY pk1, pk2;

每批数据导入进去后,物化视图相当于触发器就会为这批数据生成一个中间聚合状态存储到上层的聚合表。

查询:每次查询看结果的时候需要给中间聚合状态加上Merge后缀得到最终的可视化结果,否则是一些二进制的数据。

SELECT pk1,
pk2,
maxMerge(version) AS version,
argMaxIfMerge(col1) AS col1,
argMaxIfMerge(col2) AS col2,
argMaxIfMerge(col3) AS col3
FROM default.all_agg_update
GROUP BY pk1, pk2;

5.2 实时性

查询每次都是实时的。

5.3 优缺点

优点:

1.对于只查主键,或者几个列的情况下比较实用。同时如果每次更新都是更新某一批数据,那么上层的聚合表后台merge的时候会让表的数据量就会越来越小,最终的查询就会很快。缺点:2.不好加一些判断条件,因此最上层的表是聚合State,字段存储的都是临时状态非最终状态,查询条件都不能直接写,需要先聚合,然后在过滤3.在分布式表的情况下,节点之间可能要大量传输这些聚合中间状态,

方法六、AggregatingMergeTree+SimpleAggregateFunction

该方法具体见:ClickHouse:抓住你的每个目标用户,人群圈选业务的大杀器[3], 以及PPT[4]

6.1 用法

除了主键外的所有列都需要用anyLast
聚合。创建基表:

CREATE TABLE IF NOT EXISTS tablexx ON CLUSTER default
(
user_id UInt64,
city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一线城市' = 0, '二线城市' = 1, '三线城市' = 2, '四线城市' = 3))),
gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1))),
reg_date SimpleAggregateFunction(anyLast, Datetime),
comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),
others SimpleAggregateFunction(anyLast,Array(String))
)ENGINE = AggregatingMergeTree()
partition by toYYYYMMDD(reg_date) ORDER BY user_id;

如要更新reg_date列,则将 alter table tablexx update gender=xxx where user_id ...
 转为 INSERT INTO tablexx(user_id, gender) where user_id ...

这样对于要更新的列gender。argLast会读取到最新的,对于不需要更新的others列,argLast会读取到旧数据。就可以确保一条记录的所有列数据能够对应起来。

6.2 实时性

适用场景:允许一定的延迟,不要求完全实时。

6.3 优缺点

优点:每次更新的时候只需要关注更新的列+主键列。不用每次去获取其他列数据,性能会大大提升。缺点:1)删除无法用insert来替代,只能用delete命令执行。

2)该方法AggregatingMergeTree并不能保证任何时候的查询都是聚合过后的结果,并且也没有提供标志位用于查询数据的聚合状态与进度。

因此,为了确保数据在查询前处于已聚合的状态,还需手动下发optimize指令强制聚合过程的执行。所以阿里云采取的做法是自定配置optimize执行周期如每10分钟执行一次optimize命令。

总结

实时更新Update/Delete
操作的实现主要分为两类:

第一类:用系统自带的ALTER TABLE $table_name UPDATE/DELETE
 操作。可以适配到任一xxxMergeTree Engine. 具体又分为以下2种方式:

1.执行完Update/Delete
操作后,执行OPTIMIZE TABLE $table_name
. 因为默认的更新操作是异步的,所以强制执行optimize出发merge/mutations操作。
2.ALTER TABLE $table_name UPDATE/DELETE …settings mutations_sync = 1/2
 同步执行更新:强制本地更新操作等对应分区重写完之后再返回。

注:基于1/2的基础上,采用多分区来加速:如果更新操作是有规律的,比如总是按照某天来更新,那么可以设置分区键为每天,这样每次更新只会重写一个分区,从而加速。

第二类:用INSERT操作替代Update/Delete,最明显的优势就是批量更新转为批量插入,操作会非常快(两者的执行路径不一样)。就是基表的各列类型或者后续的查询可能需要做一些改变。此类操作基本不支持MergeTree。

1.ReplacingMergeTree: 查询的时候加上final, 通过排序来过滤重复的旧数据。缺点是每次Select都会排序,影响性能。2.MergeTree/ReplacingMergeTree通过标记位is_deleted+create_time。每次insert的数据得包含所有的列数据即需要用Insert select
来插入。每次查询的时候通过where is_deleted != 1
来过滤旧数据,group by +argMax
并得到多次更新中国最后一次更新的数据。缺点是:响应查询性能,并且在聚合列distinct值多的时候,以及arg个数Max多的时候,内存容易爆炸。[标记位的方法支持MergeTree,多了两个列相当于记录了所有的以往操作]
3.通过ReplacingMergeTree+物化视图+AggregatingMergeTree+AggregateFunction的方法:对基表ReplacingMergeTree的更新转为插入操作中,需要获得所有列的数据,即要查询旧数据。每次最终的查询需要用select xxx-Merge的组合函数group by key来实现。他的优势在于:可以做预聚合。适用于对同一条记录的多次修改的场景。问题是:1)本质上无法解决2中的多distinct group by key与argMax多聚合函数的问题,2)由于AggregatingFunction记录的是聚合中间状态是个二进制数据,无法对列做过滤操作。3)如果是分布式的环境,那么distinct group by key多了之后,网络传输开销就会很大。【这个是ClickHouse分布式环境下都有这个问题,所以分布式下表的分片要分好,每个节点做本地聚合的结果直接是最终结果的一个子集就比较合适。这样就不用传输聚合的中间状态】4.用阿里云的AggregatingMergeTree+SimpleAggregateFunction。anyLast官网介绍结果是不确定的,所以这个如何保证得到的是最后更新的列呢?实时更新方面:对于主键可以通过distinct(group by key)来查看最新的,而对于其他的数据还是需要optimize table 来得到最新的数据。merge合并速度不会更快,aggregatingMT解决的问题是:update转化为insert的时候,不需要更新哪些列,可以直接留空,不用写出来,从而提升写入性能。

References

[1]
 ClickHouse准实时数据更新的新思路: https://cloud.tencent.com/developer/article/1644570
[2]
 有限更新 Limited Update: https://github.com/wangxuanyue/ClickHouse_example/blob/main/Limited_Update.md
[3]
 ClickHouse:抓住你的每个目标用户,人群圈选业务的大杀器: https://developer.aliyun.com/article/781084?
[4]
 PPT: https://presentations.clickhouse.tech/meetup50/5_alibaba.pdf

[5]
 ClickHouse实时更新: https://altinity.com/blog/2020/4/14/handling-real-time-updates-in-clickhouse



文章转载自ClickHouse研发笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论