暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Citus分布表总结

原创 chirpyli 2022-11-16
1647

Citus存在如下两种分布表

  • Distribute Table
  • Reference Table

分别都有各自的应用场景。下面我们将分别介绍如何使用分布表。

Distribute Table

创建分布表,可通过create_distributed_table函数创建,参数如下:
必填项:

  • table_name: 指定分布表名 (必须要指定)
  • distribution_column: 分布列,表按分布列分布在worker节点上 (必须要指定分布列)

可选配置项:

  • distribution_type: 分布方式,hash,ppend分布,如果不添加这个参数默认为hash分布。
  • colocate_with: 亲和性共存, 将表中与指定表(所设置的要与之亲和的表)分布列值相同的行分布到同一个Worker节点中。
  • shard_count: 设置分片数量,分布表要被切分为多少个shard。 可通过pg_dist_shard系统表查看分片信息。也可以通过set citus.shard_count的方式进行设置。

下面我们将详细讨论这些内容。

分布列

在其他分布式数据库中分布键的概念,例如Greenplum,分布列概念相似。在Citus中目前支持一个分布列。

create table t(a int, b int); select create_distributed_table('t', 'a'); -- 指定分布列a -- 等同于其他分布式数据库中类似的语法create table t(a int, b int) distributed by (a);

可以看到,相比普通表,分布式表增加了一个分布列(也可以理解为分布键)的概念,通过分布列将表分布到Worker节点。所以在创建分布表的时候,分布列的选择非常重要,分布列的选择,应该考虑到尽量均匀的分布在worker节点上(尽量避免Worker节点负载不均),除此之外还应注意后续进行Join等操作时要尽量避免数据重分布。因为数据重分布的代价是非常高昂的。另外还有一点就是数据亲和性问题,较好的亲和性,性能会有所提升。

分布方式

分布表的数据按分布列分布到Worker节点。每个分片(shard)都有唯一的shardid编号,每个shard对应Worker节点上名为tablename_shardid的表。分布数据时还可指定表的分布方式,目前支持hash以及append分布方式,默认为hash分布。

hash分布

对分布列计算hash值,按hash值进行range分割,hash分割的边界值保存在Coordinator节点的pg_dist_shard系统表中。

create table heilongjiang3(a int, b text); select create_distributed_table('heilongjiang3', 'a', 'hash', shard_count:=2); -- 2个分片,我们看一下其每个分片的hash取值 postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------+---------+--------------+---------------+--------------- heilongjiang3 | 102277 | t | -2147483648 | -1 -- 需要注意这是hash值的分界并不是分布列a(int)值的分界 heilongjiang3 | 102278 | t | 0 | 2147483647 (2 rows) -- Coordinator节点查询数据, 1个Coordinator节点, 2个Worker节点 postgres@postgres=# select * from heilongjiang3 ; a | b ---+------- 2 | aaaa 2 | b 1 | aaa 4 | aabaa 3 | baa 1 | a (6 rows) -- Worker节点1查询 postgres@postgres=# select * from heilongjiang3_102277 ; a | b ---+------- 1 | aaa 4 | aabaa 3 | baa 1 | a (4 rows) -- Workder节点2查询 postgres@postgres=# select * from heilongjiang3_102278 ; a | b ---+------ 2 | aaaa 2 | b (2 rows)
append分布

append分布,顾名思义,新增数据每次只在一个Worker节点上写入并写入到最新一个shard分片中,新增分片以轮巡的方式创建在不同的Worker节点上,适用于追加写入等场景。

-- 建表,以append方式分布 postgres@postgres=# create table ta(a int, b int); CREATE TABLE postgres@postgres=# select create_distributed_table('ta','a','append'); create_distributed_table -------------------------- (1 row) -- 刚刚创建表,没有数据,也没有shard, 此时Worker节点上是没有相关shard表存在的,与hash分布的方式不同 postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- (0 rows) -- 为append分布表创建shard, 此时只在某一个Worker节点创建分片 postgres@postgres=# select master_create_empty_shard('ta'); master_create_empty_shard --------------------------- 102281 (1 row) -- 只有1个shard postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- ta | 102281 | t | | (1 row) postgres@postgres=# insert into ta values(1,1); INSERT 0 1 postgres@postgres=# select * from ta; a | b ---+--- 1 | 1 (1 row) -- 继续为append分布表创建shard, 这时在另一个worker节点中创建了新的分片 postgres@postgres=# select master_create_empty_shard('ta'); master_create_empty_shard --------------------------- 102282 (1 row) -- 可以看到有2个shard了。 postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- ta | 102281 | t | | ta | 102282 | t | | (2 rows)
分片数量和副本数

分布表有两个相关的配置参数,分片数量和副本数。

  • 分片数量: citus.shard_count ,配置分布表的分片数量,即可在postgres.conf配置文件中设置,也可以建表时指定某个分布表的分片数量。
  • 副本数: citus.shard_replication_factor , 对分布表进行分片后的shard,支持多副本shard
-- 默认副本数1 postgres@postgres=# show citus.shard_replication_factor ; citus.shard_replication_factor --------------------------------- 1 (1 row) -- 创建分布表,指定shard_count = 2; create table heilongjiang3(a int, b text); select create_distributed_table('heilongjiang3', 'a', shard_count:=2); -- 2个分片 postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------+---------+--------------+---------------+--------------- heilongjiang3 | 102277 | t | -2147483648 | -1 heilongjiang3 | 102278 | t | 0 | 2147483647 (2 rows) -- 每个分片1个副本 postgres@postgres=# select * from pg_dist_placement; placementid | shardid | shardstate | shardlength | groupid -------------+---------+------------+-------------+--------- 300 | 102277 | 1 | 0 | 1 301 | 102278 | 1 | 0 | 2 (2 rows) -- 将shard_replication_factor副本数设置为2 postgres@postgres=# set citus.shard_replication_factor = 2; SET postgres@postgres=# show citus.shard_replication_factor ; citus.shard_replication_factor --------------------------------- 2 (1 row) -- 建表 create table heilongjiang2(a int, b text); -- 创建分布表, 设定分片数量4 select create_distributed_table('heilongjiang2', 'a', shard_count:=4); -- 可以看到4个分片 postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------+---------+--------------+---------------+--------------- heilongjiang2 | 102273 | t | -2147483648 | -1073741825 heilongjiang2 | 102274 | t | -1073741824 | -1 heilongjiang2 | 102275 | t | 0 | 1073741823 heilongjiang2 | 102276 | t | 1073741824 | 2147483647 (4 rows) -- 可以看到每个分片2个副本 postgres@postgres=# select * from pg_dist_placement; placementid | shardid | shardstate | shardlength | groupid -------------+---------+------------+-------------+--------- 292 | 102273 | 1 | 0 | 1 293 | 102273 | 1 | 0 | 2 294 | 102274 | 1 | 0 | 2 295 | 102274 | 1 | 0 | 1 296 | 102275 | 1 | 0 | 1 297 | 102275 | 1 | 0 | 2 298 | 102276 | 1 | 0 | 2 299 | 102276 | 1 | 0 | 1 (8 rows)
colocate_with

我们前面将分布列的时候讲过,分布列的选择要考虑到后续Join等操作时要减少数据重分布的问题。当多表Join时,因为不同的表分布列不同,Join时很可能需要重分布,为了避免后续重分布等情况,我们可以考虑将表与表之间建立一种colocate亲和关系,即:如果不同分布式表所执行的分布列类型、shard分片数量及副本数量都相同,则这些分布表会按相同的hash值范围分片,并把相同范围的分片存储在相同的worker节点上,这样分布列值相同的行会亲和性的共存在同一个worker节点上。这样可以认为表与表有亲和性, 这样在进行Join时很多情况就无需进行重分布了。

同样的概念,我们在建表时可以指定与那个表建立亲和关系,也就是与之按相同的分布列类型、相同的方式分布、相同的分片数量、相同的副本数量进行分布。我们举个例子:

postgres@postgres=# create table heilongjiang(a int, b text); CREATE TABLE -- 指定与heilongjiang3亲和关系 postgres@postgres=# select create_distributed_table('heilongjiang', 'a', colocate_with => 'heilongjiang3'); create_distributed_table -------------------------- (1 row) -- 可以看到与heilongjiang3相同的分布, postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------+---------+--------------+---------------+--------------- heilongjiang3 | 102277 | t | -2147483648 | -1 heilongjiang3 | 102278 | t | 0 | 2147483647 heilongjiang | 102279 | t | -2147483648 | -1 heilongjiang | 102280 | t | 0 | 2147483647 (4 rows) -- 我们简单验证一下。分布列值为2的行与表heilongjiang3分布到了同一个worker节点上。 postgres@postgres=# select * from heilongjiang_102280 ; a | b ---+----- 2 | new (1 row) postgres@postgres=# select * from heilongjiang3_102278 ; a | b ---+------ 2 | aaaa 2 | b (2 rows)

需要注意的是,分布表之间的亲和性共存关系,在集群扩容数据重均衡时,亲和性依旧有效,citus会保证亲和关系一致并且不同表的相关的shard会一起迁移到新节点。

Reference Table

相比Distribute Table需要将数据分片保存在Worker节点上,Reference Table不会进行分片,每个Worker节点都存储一份相同的表数据,并通过2PC保证多个副本的强一致性。

适用场景: 小表,高频使用的表,因为所有Worker节点都保存相同的表数据,所以在Join查询时可以避免进行数据重分布以及跨节点获取数据。

可使用create_reference_table函数进行创建,不需要分布列。

-- 建表 CREATE TABLE nation ( n_nationkey integer not null, n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152)); -- 创建Reference Table postgres@postgres=# SELECT create_reference_table('nation'); create_reference_table ------------------------ (1 row) -- postgres@postgres=# select * from pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- nation | 102248 | t | | (1 row) -- 可在Worker节点查看到nation_102248表 -- public | nation_102248 | table | postgres 我们向Reference Table中插入数据,并查看一下,验证一下。 -- Coordinator节点插入数据并查看 postgres@postgres=# insert into nation values(1,'china',100, 'a'); INSERT 0 1 postgres@postgres=# insert into nation values(2,'usa',90, 'b'); INSERT 0 1 postgres@postgres=# select * from nation; n_nationkey | n_name | n_regionkey | n_comment -------------+---------------------------+-------------+----------- 1 | china | 100 | a 2 | usa | 90 | b (2 rows) -- 在某一Worker节点查看数据 postgres@postgres=# select * from nation_102248; n_nationkey | n_name | n_regionkey | n_comment -------------+---------------------------+-------------+----------- 1 | china | 100 | a 2 | usa | 90 | b (2 rows)

需要注意的是分区表是不支持创建Reference Table的,另外也不支持range分布以及append分布。使用的时候需要注意。

CREATE TABLE ningbo ( a int, b int, c text ) PARTITION BY HASH(a,b) ( PARTITION p1 VALUES WITH (MODULUS 3, REMAINDER 0), PARTITION p2 VALUES WITH (MODULUS 3, REMAINDER 1), PARTITION p3 VALUES WITH (MODULUS 3, REMAINDER 2) ); -- 错误示例 postgres@postgres=# select create_reference_table('ningbo'); ERROR: distributing partitioned tables in only supported for hash-distributed tables -- 错误示例 postgres@postgres=# select create_distributed_table('ningbo', 'a', 'append'); ERROR: distributing partitioned tables in only supported for hash-distributed tables

总结

以上我们梳理了分布表的用法,相比单机数据库,分布式数据库中需要根据不同的情形使用不同的表,不同的分布方式,选择合适的分布列,相比单机需要考虑的更多。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论