Citus存在如下两种分布表
- Distribute Table
- Reference Table
分别都有各自的应用场景。下面我们将分别介绍如何使用分布表。
Distribute Table
创建分布表,可通过create_distributed_table
函数创建,参数如下:
必填项:
- table_name: 指定分布表名 (必须要指定)
- distribution_column: 分布列,表按分布列分布在worker节点上 (必须要指定分布列)
可选配置项:
- distribution_type: 分布方式,hash,ppend分布,如果不添加这个参数默认为hash分布。
- colocate_with: 亲和性共存, 将表中与指定表(所设置的要与之亲和的表)分布列值相同的行分布到同一个Worker节点中。
- shard_count: 设置分片数量,分布表要被切分为多少个shard。 可通过pg_dist_shard系统表查看分片信息。也可以通过
set citus.shard_count
的方式进行设置。
下面我们将详细讨论这些内容。
分布列
在其他分布式数据库中分布键的概念,例如Greenplum,分布列概念相似。在Citus中目前支持一个分布列。
create table t(a int, b int);
select create_distributed_table('t', 'a'); -- 指定分布列a
-- 等同于其他分布式数据库中类似的语法create table t(a int, b int) distributed by (a);
可以看到,相比普通表,分布式表增加了一个分布列(也可以理解为分布键)的概念,通过分布列将表分布到Worker节点。所以在创建分布表的时候,分布列的选择非常重要,分布列的选择,应该考虑到尽量均匀的分布在worker节点上(尽量避免Worker节点负载不均),除此之外还应注意后续进行Join等操作时要尽量避免数据重分布。因为数据重分布的代价是非常高昂的。另外还有一点就是数据亲和性问题,较好的亲和性,性能会有所提升。
分布方式
分布表的数据按分布列分布到Worker节点。每个分片(shard)都有唯一的shardid编号,每个shard对应Worker节点上名为tablename_shardid的表。分布数据时还可指定表的分布方式,目前支持hash以及append分布方式,默认为hash分布。
hash分布
对分布列计算hash值,按hash值进行range分割,hash分割的边界值保存在Coordinator节点的pg_dist_shard系统表中。
create table heilongjiang3(a int, b text);
select create_distributed_table('heilongjiang3', 'a', 'hash', shard_count:=2);
-- 2个分片,我们看一下其每个分片的hash取值
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
heilongjiang3 | 102277 | t | -2147483648 | -1 -- 需要注意这是hash值的分界并不是分布列a(int)值的分界
heilongjiang3 | 102278 | t | 0 | 2147483647
(2 rows)
-- Coordinator节点查询数据, 1个Coordinator节点, 2个Worker节点
postgres@postgres=# select * from heilongjiang3 ;
a | b
---+-------
2 | aaaa
2 | b
1 | aaa
4 | aabaa
3 | baa
1 | a
(6 rows)
-- Worker节点1查询
postgres@postgres=# select * from heilongjiang3_102277 ;
a | b
---+-------
1 | aaa
4 | aabaa
3 | baa
1 | a
(4 rows)
-- Workder节点2查询
postgres@postgres=# select * from heilongjiang3_102278 ;
a | b
---+------
2 | aaaa
2 | b
(2 rows)
append分布
append分布,顾名思义,新增数据每次只在一个Worker节点上写入并写入到最新一个shard分片中,新增分片以轮巡的方式创建在不同的Worker节点上,适用于追加写入等场景。
-- 建表,以append方式分布
postgres@postgres=# create table ta(a int, b int);
CREATE TABLE
postgres@postgres=# select create_distributed_table('ta','a','append');
create_distributed_table
--------------------------
(1 row)
-- 刚刚创建表,没有数据,也没有shard, 此时Worker节点上是没有相关shard表存在的,与hash分布的方式不同
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
(0 rows)
-- 为append分布表创建shard, 此时只在某一个Worker节点创建分片
postgres@postgres=# select master_create_empty_shard('ta');
master_create_empty_shard
---------------------------
102281
(1 row)
-- 只有1个shard
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
ta | 102281 | t | |
(1 row)
postgres@postgres=# insert into ta values(1,1);
INSERT 0 1
postgres@postgres=# select * from ta;
a | b
---+---
1 | 1
(1 row)
-- 继续为append分布表创建shard, 这时在另一个worker节点中创建了新的分片
postgres@postgres=# select master_create_empty_shard('ta');
master_create_empty_shard
---------------------------
102282
(1 row)
-- 可以看到有2个shard了。
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
ta | 102281 | t | |
ta | 102282 | t | |
(2 rows)
分片数量和副本数
分布表有两个相关的配置参数,分片数量和副本数。
- 分片数量: citus.shard_count ,配置分布表的分片数量,即可在postgres.conf配置文件中设置,也可以建表时指定某个分布表的分片数量。
- 副本数: citus.shard_replication_factor , 对分布表进行分片后的shard,支持多副本shard
-- 默认副本数1
postgres@postgres=# show citus.shard_replication_factor ;
citus.shard_replication_factor
---------------------------------
1
(1 row)
-- 创建分布表,指定shard_count = 2;
create table heilongjiang3(a int, b text);
select create_distributed_table('heilongjiang3', 'a', shard_count:=2);
-- 2个分片
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
heilongjiang3 | 102277 | t | -2147483648 | -1
heilongjiang3 | 102278 | t | 0 | 2147483647
(2 rows)
-- 每个分片1个副本
postgres@postgres=# select * from pg_dist_placement;
placementid | shardid | shardstate | shardlength | groupid
-------------+---------+------------+-------------+---------
300 | 102277 | 1 | 0 | 1
301 | 102278 | 1 | 0 | 2
(2 rows)
-- 将shard_replication_factor副本数设置为2
postgres@postgres=# set citus.shard_replication_factor = 2;
SET
postgres@postgres=# show citus.shard_replication_factor ;
citus.shard_replication_factor
---------------------------------
2
(1 row)
-- 建表
create table heilongjiang2(a int, b text);
-- 创建分布表, 设定分片数量4
select create_distributed_table('heilongjiang2', 'a', shard_count:=4);
-- 可以看到4个分片
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
heilongjiang2 | 102273 | t | -2147483648 | -1073741825
heilongjiang2 | 102274 | t | -1073741824 | -1
heilongjiang2 | 102275 | t | 0 | 1073741823
heilongjiang2 | 102276 | t | 1073741824 | 2147483647
(4 rows)
-- 可以看到每个分片2个副本
postgres@postgres=# select * from pg_dist_placement;
placementid | shardid | shardstate | shardlength | groupid
-------------+---------+------------+-------------+---------
292 | 102273 | 1 | 0 | 1
293 | 102273 | 1 | 0 | 2
294 | 102274 | 1 | 0 | 2
295 | 102274 | 1 | 0 | 1
296 | 102275 | 1 | 0 | 1
297 | 102275 | 1 | 0 | 2
298 | 102276 | 1 | 0 | 2
299 | 102276 | 1 | 0 | 1
(8 rows)
colocate_with
我们前面将分布列的时候讲过,分布列的选择要考虑到后续Join等操作时要减少数据重分布的问题。当多表Join时,因为不同的表分布列不同,Join时很可能需要重分布,为了避免后续重分布等情况,我们可以考虑将表与表之间建立一种colocate亲和关系,即:如果不同分布式表所执行的分布列类型、shard分片数量及副本数量都相同,则这些分布表会按相同的hash值范围分片,并把相同范围的分片存储在相同的worker节点上,这样分布列值相同的行会亲和性的共存在同一个worker节点上。这样可以认为表与表有亲和性, 这样在进行Join时很多情况就无需进行重分布了。
同样的概念,我们在建表时可以指定与那个表建立亲和关系,也就是与之按相同的分布列类型、相同的方式分布、相同的分片数量、相同的副本数量进行分布。我们举个例子:
postgres@postgres=# create table heilongjiang(a int, b text);
CREATE TABLE
-- 指定与heilongjiang3亲和关系
postgres@postgres=# select create_distributed_table('heilongjiang', 'a', colocate_with => 'heilongjiang3');
create_distributed_table
--------------------------
(1 row)
-- 可以看到与heilongjiang3相同的分布,
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
heilongjiang3 | 102277 | t | -2147483648 | -1
heilongjiang3 | 102278 | t | 0 | 2147483647
heilongjiang | 102279 | t | -2147483648 | -1
heilongjiang | 102280 | t | 0 | 2147483647
(4 rows)
-- 我们简单验证一下。分布列值为2的行与表heilongjiang3分布到了同一个worker节点上。
postgres@postgres=# select * from heilongjiang_102280 ;
a | b
---+-----
2 | new
(1 row)
postgres@postgres=# select * from heilongjiang3_102278 ;
a | b
---+------
2 | aaaa
2 | b
(2 rows)
需要注意的是,分布表之间的亲和性共存关系,在集群扩容数据重均衡时,亲和性依旧有效,citus会保证亲和关系一致并且不同表的相关的shard会一起迁移到新节点。
Reference Table
相比Distribute Table需要将数据分片保存在Worker节点上,Reference Table不会进行分片,每个Worker节点都存储一份相同的表数据,并通过2PC保证多个副本的强一致性。
适用场景: 小表,高频使用的表,因为所有Worker节点都保存相同的表数据,所以在Join查询时可以避免进行数据重分布以及跨节点获取数据。
可使用create_reference_table
函数进行创建,不需要分布列。
-- 建表
CREATE TABLE nation (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
-- 创建Reference Table
postgres@postgres=# SELECT create_reference_table('nation');
create_reference_table
------------------------
(1 row)
--
postgres@postgres=# select * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
nation | 102248 | t | |
(1 row)
-- 可在Worker节点查看到nation_102248表
-- public | nation_102248 | table | postgres
我们向Reference Table中插入数据,并查看一下,验证一下。
-- Coordinator节点插入数据并查看
postgres@postgres=# insert into nation values(1,'china',100, 'a');
INSERT 0 1
postgres@postgres=# insert into nation values(2,'usa',90, 'b');
INSERT 0 1
postgres@postgres=# select * from nation;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+-----------
1 | china | 100 | a
2 | usa | 90 | b
(2 rows)
-- 在某一Worker节点查看数据
postgres@postgres=# select * from nation_102248;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+-----------
1 | china | 100 | a
2 | usa | 90 | b
(2 rows)
需要注意的是分区表是不支持创建Reference Table的,另外也不支持range分布以及append分布。使用的时候需要注意。
CREATE TABLE ningbo (
a int,
b int,
c text
) PARTITION BY HASH(a,b)
(
PARTITION p1 VALUES WITH (MODULUS 3, REMAINDER 0),
PARTITION p2 VALUES WITH (MODULUS 3, REMAINDER 1),
PARTITION p3 VALUES WITH (MODULUS 3, REMAINDER 2)
);
-- 错误示例
postgres@postgres=# select create_reference_table('ningbo');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
-- 错误示例
postgres@postgres=# select create_distributed_table('ningbo', 'a', 'append');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
总结
以上我们梳理了分布表的用法,相比单机数据库,分布式数据库中需要根据不同的情形使用不同的表,不同的分布方式,选择合适的分布列,相比单机需要考虑的更多。