暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

OceanBase 并行执行技术

2069
自 2019 年以来,OceanBase 的并行执行功能被快速应用于各个场景,其重要性逐步提升。
这是一篇非常详尽的并行执行用户使用文档,目的是给正在学习和使用 OceanBase 并行执行功能的团队和个人带来便利。

并行执行是让单个 SQL 语句使用多个 CPU 和 I/O 资源的能力。这篇文章将详细介绍并行执行的工作原理,并解释如何控制、管理和监控 OceanBase 数据库中的并行执行。文章包括以下几个部分:

  1. 并行执行概念
  2. 设定并行度
  3. 并发控制与排队
  4. 并行执行分类
  5. 并行执行控制参数
  6. 并行执行性能诊断
  7. 并行执行调优技巧


自 2019 年以来,OceanBase 的并行执行功能被快速应用于各个场景,其重要性逐步提升。
这是一篇非常详尽的并行执行用户使用文档,目的是给正在学习和使用 OceanBase 并行执行功能的团队和个人带来便利。

并行执行是让单个 SQL 语句使用多个 CPU 和 I/O 资源的能力。这篇文章将详细介绍并行执行的工作原理,并解释如何控制、管理和监控 OceanBase 数据库中的并行执行。文章包括以下几个部分:

  1. 并行执行概念
  2. 设定并行度
  3. 并发控制与排队
  4. 并行执行分类
  5. 并行执行控制参数
  6. 并行执行性能诊断
  7. 并行执行调优技巧

1 并行执行概念

并行执行是指将一个 SQL 查询任务分成多个子任务,并允许这些子任务在多个处理器上同时运行,以提高整个查询任务的执行效率。在现代计算机系统中,多核处理器、多线程和高速网络连接广泛应用,这使得并行执行成为了一种可行的高效率查询技术。

并行执行能够极大降低计算密集型大查询的响应时间,被广泛应用在离线数据仓库、实时报表、在线大数据分析等业务场景,同时还应用在批量导数,快速构建索引表等领域。

以下场景会从并行执行中获益:

  • 大表扫描,大表连接,大数据量排序,聚合
  • 大表 DDL 操作,如修改主键、改变列类型,建索引等
  • 从已有大数据建表(Create Table As Select)
  • 批量插入、删除、更新

本节包含如下内容:

  • 什么场景适用并行执行
  • 什么场景不适用并行执行
  • 硬件要求
  • 并行执行工作原理
  • 并行执行工作线程
  • 通过均衡负载来优化性能

1.1 什么场景适用并行执行

并行执行通过充分利用多个 CPU 和 IO 资源,以达到降低 SQL 执行时间的目的。

当满足下列条件时,使用并行执行会优于串行执行:

  • 访问的数据量大
  • SQL 并发低
  • 要求低延迟
  • 有充足的硬件资源

并行执行用多个处理器协同并发处理同一任务,在这样的系统中会有收益:

  • 多处理器系统(SMPs)、集群
  • IO 带宽足够
  • 内存富余(可用于处理内存密集型操作,如排序、建 hash 表等)
  • 系统负载不高,或有峰谷特征(如系统负载一般在 30% 以下)

如果你的系统不满足上述特征,那么并行执行可能不会带来显著收益。在一些高负载,小内存,或 IO 能力弱的系统里,并行执行甚至会带来负面效果。

并行执行不仅适用于离线数据仓库、实时报表、在线大数据分析等分析型系统,而且在 OLTP 领域也能发挥作用,可用于加速 DDL 操作、以及数据跑批工作等。但是,对于 OLTP 系统中的普通 SELECT 和 DML 语句,并行执行并不适用。

1.2 什么场景不适用并行执行

串行执行使用单个线程来执行数据库操作,在下面这些场景下使用串行执行优于并行执行:

  • Query 访问的数据量很小
  • 高并发
  • Query 执行时间小于 100 毫秒

并行执行一般不适用于如下场景:

  • 系统中的典型 SQL 执行时间都在毫秒级。并行查询本身有毫秒级的调度开销,对于短查询来说,并行执行带来的收益完全会被调度开销所抵消。
  • 系统负载本就很高。并行执行的设计目标就是去充分利用系统的空余资源,如果系统本身已经没有空余资源,那么并行执行并不能带来额外收益,相反还会影响系统整体性能。

1.3 硬件要求

并行执行对硬件没有特殊要求。需要注意的是,CPU 核数、内存大小、存储 I/O 性能、网络带宽都会影响并行执行性能,其中任意一项成为瓶颈都会拖累并行执行性能。

1.4 并行执行工作原理

并行执行将一个 SQL 查询任务分解成多个子任务,并调度这些子任务到多个处理器上运行。

本节包含如下内容:

  • SQL 语句的并行执行
  • 生产者消费者流水线模型
  • 并行的粒度
  • 生产者和消费者之间的数据分发方式
  • 生产者和消费者之间的数据传输机制

1.4.1 SQL 语句的并行执行

当一个 SQL 被解析为并行执行计划后,会按照下面的步骤执行:

  1. SQL 主线程(接收、解析SQL的线程)根据计划形态预约并行执行需要的线程资源。这些线程资源可能来自集群中的多台机器。
  2. SQL 主线程打开并行调度算子(PX COORDINATOR)。
  3. 并行调度算子解析计划,将它们切分成多个操作步骤,按照自底向上的顺序调度执行这些操作。每个操作都会尽可能并行执行。
  4. 当所有操作并行执行完成后,并行调度算子会接收计算结果,并将结果吐给它的上层算子(如 Aggregate 算子),串行完成剩余不可并行的计算(如最终的 SUM 计算)。

1.4.2 生产者-消费者流水线模型

并行执行使用生产者-消费者模型来进行流水执行。并行调度算子解析计划,将它们切分成多个操作步骤,每个操作步骤称之为一个 DFO(Data Flow Operation)

一般情况下,并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来,这称为 DFO 间的并行执行。每个 DFO 会使用一组线程来执行,这称为 DFO 内的并行执行,这个 DFO 使用的线程数称为 DOP(Degree Of Parallisim)

上一阶段的消费者 DFO 会成为下一阶段的生产者 DFO。在并行调度算子的协调下,会同时启动消费者 DFO 和生产者 DFO。

下图中:

(1)DFO A 生成的数据会立即传输给DFO B 进行计算;

(2)DFO B 完成计算后,会将数据暂存在当前线程中,等待它的上层 DFO C 启动;

(3)当 DFO B 收到 DFO C 启动完成通知后,会将自己的角色转变成生产者,开始向 DFO C 传输数据,DFO C 收到数据后开始计算。

考虑下面的查询:

create table game (round int primary key, team varchar(10), score int)
    partition by hash(round) partitions 3;

insert into game values (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
insert into game values (4, "CN", 4), (5, "US", 4), (6, "JP", 4);

select /*+ parallel(3) */ team, sum(score) total from game group by team;

查询语句对应的执行计划:

OceanBase(admin@test)>explain select /*+ parallel(3) */ team, sum(score) total from game group by team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan                                                                                              |
+---------------------------------------------------------------------------------------------------------+
| =================================================================                                       |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                                       |
| -----------------------------------------------------------------                                       |
| |0 |PX COORDINATOR               |        |1       |4           |                                       |
| |1 | EXCHANGE OUT DISTR          |:EX10001|1       |4           |                                       |
| |2 |  HASH GROUP BY              |        |1       |4           |                                       |
| |3 |   EXCHANGE IN DISTR         |        |3       |3           |                                       |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|3       |3           |                                       |
| |5 |     HASH GROUP BY           |        |3       |2           |                                       |
| |6 |      PX BLOCK ITERATOR      |        |1       |2           |                                       |
| |7 |       TABLE SCAN            |game    |1       |2           |                                       |
| =================================================================                                       |
| Outputs & filters:                                                                                      |
| -------------------------------------                                                                   |
|   0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|       dop=3                                                                                             |
|   2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256                  |
|       group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))])                                  |
|   3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|   4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       (#keys=1, [game.team]), dop=3                                                                     |
|   5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       group([game.team]), agg_func([T_FUN_SUM(game.score)])                                             |
|   6 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|   7 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|       access([game.team], [game.score]), partitions(p[0-2])                                             |
|       is_index_back=false, is_global_index=false,                                                       |
|       range_key([game.round]), range(MIN ; MAX)always true                                              |
+---------------------------------------------------------------------------------------------------------+
29 rows in set (0.003 sec)

select语句的执行计划会首先对 game 表做全表扫描,按照 team 做分组求和,然后算出每个 team 的总分数。这个查询的执行示意图如下:

从图中可以看到,这个查询实际使用了 6 个线程。

  • 第一步:前三个线程负责 game 表扫描,并且在每个线程内对 game.team 数据做预聚合;
  • 第二步:后三个线程负责对预聚合的数据做最终聚合;
  • 第三步:最终聚合结果发给并行调度器,由它返回给客户端。

第一步的数据发给第二步时,需要用 game.team 字段做 hash,决定将预聚合数据发给哪个线程。

1.4.3 并行的粒度

并行数据扫描的基本工作单元称为 granule。OceanBase 将表扫描工作划分成多个 granule,每个 granule 描述了一段扫描任务的范围。因为 granule 不会跨表分区 ,所以每段扫描任务一定是位于一个分区内。

根据 granule 的粒度特征,可以划分为两类:

  • Partition Granule

Partition granule 描述的范围是一整个分区,扫描任务涉及到多少个分区,就会划分出多少个 partition granule。这里的分区既可以是主表分区,也可以是索引分区。

Partition Granule 最常用的应用场景是 partition wise join,两张表里对应的分区通过 partition granule 可以确保被同一个工作线程处理。

  • Block Granule

Block granule 描述的范围是一个分区中的一段连续数据。数据扫描场景里,一般都是使用 block granule 划分数据。每个分区都会被划分为若干个 block,这些 block 再以一定的规则串联起来形成一个任务队列,供并行工作线程消费。

在给定并行度的情况下,为了确保扫描任务的均衡,优化器会自动选择将数据划分成分区粒度(Partition Granule)或块粒度(Block Granule)。如果选择了 Block Granule,并行执行框架会在运行时决策 block 的划分,总体原则是确保一个 block 既不会太大,也不会太小。太大,可能导致数据倾斜,让部分线程少干活;太小,会导致频繁的扫描切换开销。

划分好分区粒度后,每个粒度对应一个扫描任务。Table Scan 扫描算子会一个接一个地处理这些扫描任务,处理完一个之后,接着处理下一个,直到所有任务处理完毕。

1.4.4 生产者和消费者之间的数据分发方式

数据分发方式指的是,数据从一组并行执行工作线程(生产者)发送给另一组(消费者)时使用的方法。优化器会使用一系列的优化策略,决策使用哪种数据重分布方式,以达到最优的性能。

并行执行中的常见数据分发方式包括:

  • Hash Distribution

使用 Hash distribution 发送数据时,生产者根据 distribution key 对数据行计算 hash 值并取模,算出发给哪个消费者工作线程。大部分情况下,使用 hash distribution 能将数据较为均匀的分发给多个消费者线程。

  • Pkey Distribution

使用 Pkey Distribution 时,生产者计算出数据行对应的目标表所在分区,然后将行数据发给处理这个分区的消费者线程。

Pkey Distribution 常用于 partitial partitions wise join 场景。在 partitial partitions wise join 场景下,消费者侧的数据不需要做重分布,就可以和生产者侧的数据做 Partition Wise Join。这种方式可以减少网络通信量,提升性能。

  • Pkey Hash Distribution

使用 Pkey Hash Distribution 时,生产者首先需要计算出数据行对应的目标表所在的分区。然后,根据 distribution key 对数据行进行 hash 计算,以便决定将其发给哪一个消费者线程来处理。

Pkey Hash Distribution 常常应用于 Parallel DML 场景中。在这种场景下,一个分区可以被多个线程并发更新,因此需要使用 Pkey Hash Distribution 来确保相同值的数据行被同一个线程处理,不同值的数据行尽可能均分到多个线程处理。

  • Broadcast Distribution

使用 broadcast distribution 时,生产者将每一个数据行发送消费者端的每一个线程,使得消费者端每一个线程都拥有全量的生产者端数据。

Broadcast distribution 常用于将小表数据复制到所有执行 join 的节点,然后做本地 join 操作。这种方式可以减少网络通信量。

  • Broadcast to Host Distribution(简称 BC2HOST)

使用 broadcast to host distribution 时,生产者将每一个数据行发送消费者端的每一个节点上,使得消费者端每一个节点都拥有全量的生产者端数据。然后,节点里的消费者线程协同处理这份数据。

Broadcast to host distribution 常用于 NESTED LOOP JOIN、SHARED HASH JOIN 场景。NESTED LOOP JOIN 场景里,消费端的每个线程会从共享数据里取一部分数据行作为驱动数据,去目标表里做 join 操作;SHARED HASH JOIN 场景里,消费端的每个线程会基于共享数据协同构建 hash 表,避免每个线程独立构建相同 hash 表导致的不必要开销。

  • Range Distribution

使用 Range distribution 时,生产者将数据按照 range 范围做划分,让不同消费者线程处理不同范围的数据。

Range distribution 常用于排序场景,各个消费者线程只需排序好发给自己的数据,数据就能在全局范围内有序。

  • Random Distribution

使用 Random distribution 时,生产者将数据随机打散,发给消费者线程,使得每个消费者线程处理的数据数量几乎一致,从而达到均衡负载的目的。

Random distribution 常用于多线程并行 UNION ALL 场景,该场景只要求数据打散,负载均衡,数据之间无其它关联约束。

  • Hybrid Hash Distribution

1.4.5 生产者和消费者之间的数据传输机制

并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来并行执行。为了方便在生产者和消费者之间传输数据,需要创建一个传输网络。

例如,生产者 DFO 以 DOP = 2 来做数据扫描,消费者 DFO 以 DOP = 3 来做数据聚合计算,每个生产者线程都会创建 3 个虚拟链接去连接消费者线程,总计会创建 6 个虚拟链接。如下图:

生产者和消费者之间创建的虚拟传输网络被称为数据传输层(Data Transfer Layer,简称 DTL)。OceanBase 并行执行框架中,所有的控制消息和行数据都通过 DTL 进行收发。每个工作线程可以对外建立数千个虚拟链接,具有高度的可扩展性。除此之外,DTL 还具有数据缓冲、批量数据发送和自动流量控制等能力。

当 DTL 链接的两端位于同一个节点时,DTL 会通过内存拷贝的方式来传递消息;当 DTL 链接的两端位于不同节点时,DTL 会通过网络通信的方式来传递消息。

1.5 并行执行工作线程

一个并行查询会使用两类线程:1个主线程,若干个并行工作线程。其中主线程和普通的 TP 查询使用的线程没有任何区别,来自普通工作线程池,并行工作线程来自专用线程池。

OceanBase 使用专用线程池模型来分配并行工作线程。每个租户在其所属的各个节点里都有一个租户专属的并行执行线程池,并行查询工作线程都通过这个线程池来分配。

并行调度算子在调度每个 DFO 之前,会去线程池中申请线程资源。当 DFO 执行完成时,会立即源释放线程资源。

线程池的初始大小为 0,按需增长,没有上限。为了避免空闲线程数过多,线程池引入自动回收机制。对于任意线程:

  • 如果空闲时间超过 10 分钟,并且线程池中剩余线程数大于 8 个,则被回收销毁;
  • 如果空闲时间超过 60 分钟,则无条件销毁

虽然线程池的大小没有上限,但是通过下面两个机制,能在绝大多数场景里形成事实上限:

  1. 并行执行开始执行前,需要通过 Admission 模块预约线程资源,预约成功后才能投入执行。通过这个机制,能限制并发查询数量。Admission 模块的详细内容参考本文中 《3 并发控制与排队》一节。
  2. 查询每次从线程池申请线程时,单次申请的线程数量不会超过 N, N 等于租户 UNIT 的 MIN_CPU 乘以 px_workers_per_cpu_quota,如果超过,则最多只分配 N 个线程。px_workers_per_cpu_quota 是租户级配置项,默认值为 10。例如,一个 DFO 的 DOP = 100,它需要从 A 节点申请 30 个线程,从 B 节点申请 70 个线程,UNIT 的 MIN_CPU = 4,px_workers_per_cpu_quota = 10,那么 N = 4 * 10 = 40。最终这个 DFO 在 A 节点上实际申请到 30 个线程,B 节点上实际申请到 40 个线程,它的实际 DOP 为 70。

1.6 通过均衡负载来优化性能

为了达到最优性能,所有工作线程分到的工作任务应该尽量相等。

SQL 使用 Block Granule 划分任务的时候,工作任务会动态地分配到工作线程之间。这样可以最小化工作负载不均衡问题,即一些工作线程的工作量不会明显超过其它工作线程。SQL 使用 Partition Granule 划分任务时,可以通过让任务数是工作线程数的整数倍来优化性能。这适用于 Partition Wise Join 和并行 DML 场景。

举个例子,假设一个表有 16 个分区,每个分区的数据量差不多。你可以使用 16 工作线程(DOP 等于 16)以大约十六分之一的时间完成工作,你也可以使用五个工作线程以五分之一的时间完成工作,或使用两个线程以一半的时间完成工作。

但是,如果你使用 15 个线程来处理 16 个分区,则第一个线程完成一个分区的工作后,就开始处理第 16 个分区。而其他线程完成工作后,它们变为空闲状态。当每个分区的数据量差不多时,这种配置会导致性能不优;当每个分区的数据量有所差异时,实际性能则会因情况而异。

类似地,假设你使用 6 个线程来处理 16 个分区,每个分区的数据量差不多。在这种情况下,每个线程在完成其第一个分区后,会处理第二个分区,但只有四个线程会处理第三个分区,而其他两个线程会保持空闲。

一般来说,你不能假设在给定数量的分区(N)和给定数量的工作线程(P)上执行并行操作所花费的时间等于 N 除以 P。这个公式没有考虑到一些线程可能需要等待其他线程完成最后的分区。但是,通过选择适当的 DOP,你可以最小化工作负载不均衡问题并优化性能。

2 设定并行度

并行度(degree of parallelism,简称DOP)指的是单个 DFO 在执行时使用的工作线程数。并行执行的设计目的就是为了高效利用多核资源。OceanBase 并行执行框架提供了多种方式指定并行度,既可以手工指定,也可以让数据库帮你自动选择。本章从以下几个方面介绍并行度相关内容:

  • 手工指定并行度
  • 自动并行度
  • 可变并行度

2.1 手工指定并行度

可以对一张表指定并行度,使得对这张表的扫描总是使用并行执行。

2.1.1 指定并行度的几种方式

表属性指定并行度

下面的语句分别指定一张主表和一个索引的扫描并行度。

ALTER TABLE table_name PARALLEL 4;
ALTER TABLE table_name ALTER INDEX index_name PARALLEL 2;

如果一个 SQL 中只涉及一张表,当 SQL 中查询了主表时,除了主表所在的 DFO,其余 DFO 也会使用并行度 4 来执行;当 SQL 查询了索引表时,除了索引表所在的 DFO,其余 DFO 也会使用并行度 2 来执行。

如果一个 SQL 中涉及多张表,则会使用 PARALLEL 最大值作为整个计划的 DOP。

PARALLEL HINT 指定并行度

可以通过全局 PARALLEL HINT 来指定整个 SQL 的并行度,也可以通过表级的 PARALLEL HINT 来指定特定表的并行度。当一个 SQL 里给定了多个表的 PARALLEL HINT,那么各个表所在 DFO 的 DOP 由各个表的 Parallel 值决定。如果一个 DFO 里包含多张表,则会取他们的 PARALLEL 最大值作为 DFO 的 DOP。

OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(3) */ sum(c1) from t1;
+----------------------------------------------------------------------+
| Query Plan                                                           |
+----------------------------------------------------------------------+
| =========================================================            |
| |ID|OPERATOR             |NAME    |EST.ROWS|EST.TIME(us)|            |
| ---------------------------------------------------------            |
| |0 |SCALAR GROUP BY      |        |1       |2           |            |
| |1 | PX COORDINATOR      |        |3       |2           |            |
| |2 |  EXCHANGE OUT DISTR |:EX10000|3       |2           |            |
| |3 |   MERGE GROUP BY    |        |3       |1           |            |
| |4 |    PX BLOCK ITERATOR|        |1       |1           |            |
| |5 |     TABLE SCAN      |T1      |1       |1           |            |
| =========================================================            |
| Outputs & filters:                                                   |
| -------------------------------------                                |
|   0 - output([T_FUN_SUM(T_FUN_SUM(T1.C1))]), filter(nil), rowset=256 |
|       group(nil), agg_func([T_FUN_SUM(T_FUN_SUM(T1.C1))])            |
|   1 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|   2 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       dop=3                                                          |
|   3 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       group(nil), agg_func([T_FUN_SUM(T1.C1)])                       |
|   4 - output([T1.C1]), filter(nil), rowset=256                       |
|   5 - output([T1.C1]), filter(nil), rowset=256                       |
|       access([T1.C1]), partitions(p0)                                |
|       is_index_back=false, is_global_index=false,                    |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true    |
+----------------------------------------------------------------------+
24 rows in set (0.002 sec)
OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>create table t2 (c1 int, c2 int);
Query OK, 0 rows affected (0.150 sec)

OceanBase(TEST@TEST)>select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
Empty set (0.041 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
+----------------------------------------------------------------------------------------+
| Query Plan                                                                             |
+----------------------------------------------------------------------------------------+
| =================================================================                      |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                      |
| -----------------------------------------------------------------                      |
| |0 |PX COORDINATOR               |        |1       |7           |                      |
| |1 | EXCHANGE OUT DISTR          |:EX10002|1       |6           |                      |
| |2 |  HASH JOIN                  |        |1       |5           |                      |
| |3 |   EXCHANGE IN DISTR         |        |1       |2           |                      |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|1       |2           |                      |
| |5 |     PX BLOCK ITERATOR       |        |1       |1           |                      |
| |6 |      TABLE SCAN             |T1      |1       |1           |                      |
| |7 |   EXCHANGE IN DISTR         |        |1       |4           |                      |
| |8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|1       |4           |                      |
| |9 |     PX PARTITION ITERATOR   |        |1       |2           |                      |
| |10|      TABLE SCAN             |T2      |1       |2           |                      |
| =================================================================                      |
| Outputs & filters:                                                                     |
| -------------------------------------                                                  |
|   0 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|       dop=3                                                                            |
|   2 - output([T1.C2], [T2.C1], [T1.C1], [T2.C2]), filter(nil), rowset=256              |
|       equal_conds([T1.C2 = T2.C1]), other_conds(nil)                                   |
|   3 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   4 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       (#keys=1, [T1.C2]), dop=3                                                        |
|   5 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   6 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       access([T1.C2], [T1.C1]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true                      |
|   7 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|   8 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       (#keys=1, [T2.C1]), dop=1                                                        |
|   9 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       force partition granule                                                          |
|  10 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       access([T2.C1], [T2.C2]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T2.__pk_increment]), range(MIN ; MAX)always true                      |
+----------------------------------------------------------------------------------------+
39 rows in set (0.002 sec)

对于 DML 语句,使用上面的 HINT,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要还添加一个 HINT ENABLE_PARALLEL_DML,例如:

insert /*+ parallel(3) enable_parallel_dml */ into t3 select * from t1;

注意:并行 DML 必须指定全局 parallel hint。仅指定表级 parallel hint 无法让写入部分串行。例如,下面的 SQL 不会开启 DML 并行:

insert /*+ parallel(t3 3) enable_parallel_dml */ into t3 select * from t1;

SESSION 上指定并行度

在当前 session 上指定并行度后,session 上的所有查询语句都将以这个并行度执行。需要注意,即使是单行查询的 SQL,也会使用该并行度,可能导致性能下降。

set _force_parallel_query_dop = 3

对于 DML 语句,使用上面的命令,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要用下面的命令:

set _force_parallel_dml_dop = 3

2.1.2 并行度优先级

全局 HINT > TABLE HINT > SESSION 并行度 > TABLE DOP

对比全局 HINT 和 TABLE HINT 的优先级。可以看到,有全局 HINT 时,TABLE HINT 不生效。

OceanBase(TEST@TEST)>explain select /*+ parallel(2) parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |2           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=2                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table hint 的优先级。可以看到,使用 table 级的 HINT 时,session 设定的 DOP 不生效。

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.001 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table parallel 的优先级。可以看到 table parallel 属性的优先级低于 session 指定 dop 的优先级。

OceanBase(TEST@TEST)>alter table t1 parallel 5;
Query OK, 0 rows affected (0.135 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=5                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.000 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

2.1.3 并行度取值

先讲一个简单原则:1. parallel 的设定目的是为了把 CPU 用满。2. parallel 不是越大越好。举个例子:当租户 CPU 为 20 的时:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 理论值为 20。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 理论值为 10。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 理论值为 7 左右。

对上面的例子做解释

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。
  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。
  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

但是,以上只是一般原则,实际操作中还是应该做微调。解释如下:

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。

假设这个 DFO 大部分时间是做 IO,那么不妨把并发度设置得比 20 稍高一些,比如 25,这样可以充分利用 CPU

  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。

我们实际并不能保证每个 DFO 都能把分给他的 CPU 压满。所以,把并发度适当稍微调整高一些,比如 15 也许能更充分地利用 CPU。 ** 但是,不能无限往上加。把并发度加到 50,根本不会有收益,相反还会增加线程调度开销、框架调度开销。

  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

一个计划里,并不是处处都要启动 3 个 DFO,3 DFO 的情况可能是局部的。大部分时候可能还是同时启动 2 个 DFO。 所以这时候把并发设置为 10 也许比 7 要好。

微调后,当租户 CPU 为 20 的时,一个可能的情况如下:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 实践值为 30。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 实践值为 15。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 实践值为 10 ~ 15 左右。

3 并发控制与排队

在一定场景下,并行查询会因为等待线程资源而排队。

3.1 并行执行并发控制

我们通过租户级变量 PARALLEL_SERVERS_TARGET 指定租户在每个节点上最多可提供的并行执行工作线程数。在并行查询开始之前,从所有相关 observer 预约工作线程资源,只要其中任何一个 observer 不能为并行查询提供足够资源,就不会投入执行。该并行查询会被丢回查询队列排队,等待下次执行时重新尝试获取线程资源,直到能获取到足够工作线程资源才能获准执行。整个查询执行完成后,立即释放预约的工作线程资源。

这种“尝试预约工作线程资源-资源不足丢回查询队列-再次获得执行机会-再次尝试预约工作线程资源”的过程我们称之为并行查询排队。管理全部 observer 工作线程资源预约的模块称为并行执行资源管理器。

并行执行资源管理器为了计算每个并行查询需要的工作线程数,会将查询计划做 DFO 划分,模拟调度 DFO 过程,根据 parallel hint、table parallel 等参数计算出该查询在每个 observer 上需要的最大线程数。这组线程数我们称之为“资源向量”。

资源向量是逻辑概念,用于控制并发与排队。使用资源向量从并行执行资源管理器中预约到足够工作线程资源后,并行查询会投入执行。在执行过程中,尽管随着不同 DFO 的调度执行,会不断有物理线程的获取和释放,但是逻辑上的线程资源并不会归还给并行执行资源管理器。只有在并行查询完全执行完成后,这组资源向量才会归还给并行执行资源管理器。

当大量并发查询从并行执行资源管理器预约线程资源时,采取先来先服务的策略,直至资源分配殆尽,无法满足任何一个查询的资源需求为止。之后的查询都会丢回查询队列排队,再次调度时重试获取资源。

3.2 并行执行工作线程分配

在租户的每个 observer 上都有一个并行执行线程池,用于执行并行查询任务。执行任务时,如果线程池里线程数量不足,会动态扩容线程池。如果线程池里的线程空闲时间超过 10 分钟,会触发自动缩容到 10 个线程;如果线程池里的线程空闲时间超过 60 分钟,会触发进一步缩容,可能缩容到 0 个线程。

并行查询一旦获得调度执行后,每个 DFO 总是可以在它涉及到的 observer 的并行执行线程池里获得需要的并行线程资源。需要注意的是,默认情况下,每个 DFO 在一个 observer 上分配的线程数,不得大于租户 MIN CPU * 10,如果它提出的资源需求大于这个值,会被自动降低为 MIN CPU * 10。

3.3 两级资源控制模型

对于任意并行查询,它会经历两级资源控制:

  • 全局控制:在执行资源管理器的控制下,预约包含足够执行线程的资源向量
  • 局部控制:在并行执行线程池的控制下,分配期望的物理线程数

全局控制会考虑分布式场景下的资源获取,局部控制仅考虑单机线程池的资源分配,二者各司其职。前者确保Query 通过检查后一定能够执行下去,不会在运行时遇到拿不到资源的问题,后者确保极端情况下单个 Query 的 DFO 不会申请远大于能有效利用的物理线程数,造成线程资源浪费。一个并行查询,只要通过了全局控制阶段,就可以顺利执行,无论并发多大,都不会遇到物理线程数不足的问题。

3.4 并行执行资源管理器相关视图

并行执行资源管理器拥有全局视角,通过视图 GV$OB_PX_TARGET_MONITOR能看到租户内每个 observer 的线程预约状态。关于视图字段详细含义,参考视图手册。

OceanBase(admin@oceanbase)>select  * from GV$OB_PX_TARGET_MONITOR;
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
| SVR_IP       | SVR_PORT | TENANT_ID | IS_LEADER | VERSION         | PEER_IP      | PEER_PORT | PEER_TARGET | PEER_TARGET_USED | LOCAL_TARGET_USED | LOCAL_PARALLEL_SESSION_COUNT |
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
| 192.168.11.2 |    19512 |      1004 | N         | 555393108309134 | 192.168.11.1 |     19510 |          10 |                6 |                 0 |                            0 |
| 192.168.11.2 |    19512 |      1004 | N         | 555393108309134 | 192.168.11.2 |     19512 |          10 |                0 |                 0 |                            0 |
| 192.168.11.1 |    19510 |      1004 | Y         | 555393108309134 | 192.168.11.1 |     19510 |          10 |                6 |                 6 |                            1 |
| 192.168.11.1 |    19510 |      1004 | Y         | 555393108309134 | 192.168.11.2 |     19512 |          10 |                0 |                 0 |                            1 |
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
4 rows in set (0.002 sec)

在一个瞬态里,不同 observer 看到的全局状态可能不一致,但后台每 500 毫秒就会同步一次全局状态,总体上各个 observer 看到的状态会基本一致,不会有太大偏差。

4 并行执行分类

OceanBase 支持多种语句的并行,本章按照如下内容分别介绍:

  • 并行查询
  • 并行 DML
  • 并行 DDL
  • 并行 LOAD DATA

4.1 并行查询

你可以在下面几种场景里使用并行查询:

  • select 语句,以及 select 子查询
  • DML 语句(INSERT,UPDATE,DELETE)的查询部分
  • 外表查询

并行查询的决策分为两部分:

  1. 决定走并行查询。如果查询中使用了 PARALLEL HINT,SESSION 上开启了并行查询,或TABLE 属性指定了并行,那么将会开启并行查询。
  2. 决定并行度。并行查询中,每个 DFO 的并行度可以不一样。
  • 对于基表扫描或索引扫描 DFO,其并行度由 PARALLEL HINT、SESSION 并行属性,或TABLE 属性来决定。
  • 对于基表扫描或索引扫描 DFO,如果运行时检测到它访问的数据不足一个宏块,那么它的运行时并发度会被局部自动降低。
  • 对于 JOIN 等中间节点,其 DFO 并行度继承左孩子 DFO 的并行度。
  • 部分 DFO 不允许并行执行(如计算 ROWNUM 的节点),那么他们的并行度会被强制设为 1。

4.2 并行 DML

大部分场景下,可以使用并行 DML (Parallel DML,简称 PDML)加速数据导入、更新、删除操作。

4.2.1 DML 并行度

DML 的并行度和查询部分的并行度一致。开启并行 DML 时,查询部分总是自动开启并行。读出的数据会根据待更新表的分区位置做重分布,然后由多个线程并行 DML,每个线程负责若干个分区。

并行度和目标表的分区数之间有倍数关系时,一般可以达到最佳性能。当并发度高于分区数时,会有多个线程处理同一个分区的数据;当并发度低于分区数时,单个线程可能处理多个分区的数据,并且每个线程处理的分区不重合。当并行度大于目标表分区数时,建议并行度是分区数的整数倍。

一般来说,同时向一个分区插入数据的线程数不要超过 4 个,超过这个值后扩展性并不好,日志同步会成为瓶颈,另外还有一些分区级别的锁同步开销。当并行度小于目标表分区数时,建议分区数是并行度的整数倍。这样,每个线程处理的分区数差不多,可以避免插入工作量倾斜。

4.2.2 索引表处理策略

并行 DML 支持自动维护索引表。

当索引表为本地索引时,并行 DML 在更新主表时,存储层会自动维护本地索引。

当索引表为全局索引时,并行 DML 框架会生成特定的计划来维护全局索引。假设有两个全局索引,处理流程如下:

  1. 首先,会用 DFO1 更新主表;
  2. 然后,DFO1 将全局索引1、全局索引2需要的数据发给 DFO2,用 DFO2 更新全局索引表1;
  3. 最后,DFO2 将全局索2引需要的数据发给 DFO3,用 DFO3 更新全局索引表2。

以上策略对所有 INSERT、DELETE、UPDATE 语句有效,对于 MERGE 语句,处理方式略有不同,所有的索引维护操作会集中到一个 DFO 中处理,如下图所示:

  1. 首先,会用 DFO1 更新主表;
  2. 然后,DFO1 将全局索引1、全局索引2需要的数据发给 DFO2,DFO2 内部会逐个完成全部全局索引的维护操作。

4.2.3 更新分区键的处理策略

对于 UPDATE 语句来说,当主表或全局索引表的分区键被更新时,需要把旧数据从旧分区中删除,然后把新数据插入到新分区中。这个过程通常被称作 row movement。

当发生 row movement 时,需要将 UPDATE 操作拆分成两步操作:先 DELETE,然后 INSERT。具体地,会将出现 row movement 的 UPDATE DFO 拆分成两个 DFO,第一个 DFO 负责 DELETE,第二个 DFO 负责 INSERT。并且,为了避免主键冲突,必须确保 DELETE DFO 完全执行完成后才能开始执行 INSERT DFO。

4.2.4 事务处理

OceanBase 并行 DML 和普通 DML 语句一样,完全支持事务处理。并行 DML 语句可以和其它查询语句一起出现在同一个事务中,执行完成并行 DML 语句后无需立即提交事务,就可以在后继的查询语句中读取到 DML 语句的结果。

在 OBServer v4.1 版本之前,当并行 DML 执行时间超长时,需要给租户配置项 undo retention 设置合适的值,否则可能发生 -4138 (OB_SNAPSHOT_DISCARDED) 错误,导致 SQL 在内部反复重试,直至超时。undo retention 字面意思是 Undo 的保留位点,即从当前时间回溯多长时间的 Undo 日志是保留下来的。对于 OceanBase 数据库来说,是将该时段的所有数据多版本保留下来。当并行 DML 执行时间超过 undo retention 设定的时间时,多版本数据可能被淘汰,当 DML 中的任何后继操作试图访问淘汰的多版本数据时,就会触发 OB_SNAPSHOT_DISCARDED 报错。undo retention 的默认值是 30 分钟,这意味着在默认情况下,如果并行 DML 语句 30 分钟内不能执行完成,无论语句的超时时间设定为多少,语句都可能执行超时并报错。一般来说,如果业务中的最长并行 DML 的执行时间为 2h 时, undo retention 可以设置为 2.5h。undo retention 不能随意设置为极大值,那会导致多版本数据无法回收,打爆磁盘。

从 OBServer v4.1 版本起,并行 DML 的执行不再依赖 undo retention 设定。多版本数据会根据事务版本号回收,只要事务还活跃,事务对应的版本号能读到的内容就不会回收。不过,数据盘满的场景是例外,此时还是会强行回收多版本数据,并行 DML 会收到 OB_SNAPSHOT_DISCARDED 报错,并自动重试整个 SQL。

4.2.5 旁路导入

当内存空间不足时,并行 DML 容易报告内存不足错误。没有走旁路导入路径的并行 DML,数据首先会写入 Memtable,然后通过转储、合并写入磁盘。因为并行 DML 写入数据到 Memtable 的速度极快,当写入速度快过转储速度时,内存就会不断增长,最终触发内存不足报错。

为了解决这个问题,OceanBase v4.1 的存储层提供了旁路导入功能。当并行 DML 使用旁路导入功能执行 INSERT 语句时,数据会绕过 Memtable 直接写入磁盘,不仅避免了内存不足的问题,而且还能提升数据导入性能。

用户通过 APPEND HINT 开启旁路导入功能。旁路导入开始前,要求提交上一个事务,并且设置 autocommit = 1。在 v4.2 版本中,旁路导入功能必须配合并行 DML 才能正常工作,如果没有通过 HINT 或 session 开启并行 DML,则旁路导入的 HINT 会被自动忽略。语法示例如下:

set autocommit = 1;
insert /*+ append enable_parallel_dml parallel(3) */ into t1 select * from t2;

预计在未来版本中,旁路导入功能会放松对事务的要求,可以出现在事务中的任意位置。

4.2.6 无法并行的 DML 操作

为了保证正确的 DML 语义,如下场景中,查询部分可以并行,但 DML 部分无法并行:

  • 如果目标表包含 local unique index,则 DML 部分不可并行,查询部分依然可以并行
  • INSERT ON DUPLICATE KEY UPDATE 语句的 DML 部分不可并行
  • 如果目标表包含 Trigger、外键,则 DML 部分不可并行
  • 如果 MERGE INTO 语句的目标表中包含 global unique index,则 DML 部分不可并行
  • 如果 DML 启用了 IGNORE 模式,则 DML 部分不可并行

如果发现 DML 没有走并行,可以通过 explain extended 来查看 Note 字段,确定未走并行的原因。

4.2.7 Row Movement 操作

当更新分区表的分区键时,可能使数据从一个分区搬迁到另一个分区。在 Oracle 模式下通过下面的命令可以禁止数据跨分区搬迁:

create table t1 (c1 int primary key, c2 int) partition by hash(c1) partitions 3;
alter table t1 disable row movement;

OceanBase(TEST@TEST)>update t1 set c1 = c1 + 100000000;
ORA-14402: updating partition key column would cause a partition change

但是,并行 DML 会忽略表的 row movement 属性,总是允许更新分区键。

4.3 并行 DDL

支持并行执行的 DDL 语句包括:

  • CREATE TABLE AS SELECT
  • ALTER TABLE
  • CREATE INDEX

4.3.0 原理

所有并行 DDL 都是通过特定的 Parallel DML 完成。例如,创建索引本质是创建一个索引空表,然后并行地从主表查询出索引列数据,最后并行地插入到索引表中。

4.3.1 通过 HINT 指定并行度

目前(v4.2)仅 CREATE INDEX 语句支持通过 PARALLEL HINT 开启并行,其余 DDL 都只支持 SESSION 变量和 TABLE PARALLEL 属性来开启。

CREATE /*+ PARALLEL(3) */ INDEX IDX ON T1(C2);

4.3.2 通过 SESSION 变量指定并行度

上述所有 DDL 语句都支持通过 SESSION 变量来指定并行度。指定并行度后,该 SESSION 上的所有 DDL 都自动按照该并行度并行执行,并且查询部分和修改部分都使用相同的并行度。

SET _FORCE_PARALLEL_DDL_DOP = 3;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int);
CREATE INDEX IDX ON T1(C2);
-- v4.2 中,CREATE TABLE AS SELECT 开并行,
-- 用的是 “SET _FORCE_PARALLEL_DML_DOP”
-- 而不是 “SET _FORCE_PARALLEL_DDL_DOP”
-- 后继版本可能会修改成后者
SET _FORCE_PARALLEL_DML_DOP = 3;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int);
CREATE TABLE T2 AS SELECT * FROM T1;

4.3.3 通过表 PARALLEL 属性指定并行度

本节内容实测并没有走并行,需要跟进是否符合设计。验证方法:

select plan_operation, count(*) threads from oceanbase.gv$sql_plan_monitor where trace_id = last_trace_id() group by plan_line_id, plan_operation order by plan_line_id;

DDL 相关表上有 PARALLEL 属性时,可以使用 SET 语句设定 SESSION 变量来开启并行执行。例如:

SET _ENABLE_PARALLEL_DDL = 1;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int) PARALLEL = 3;
CREATE INDEX IDX ON T1(C2) PARALLEL = 2;
-- v4.2 中,CREATE TABLE AS SELECT 开并行,
-- 用的是 “SET _ENABLE_PARALLEL_DML”
-- 而不是 “SET _ENABLE_PARALLEL_DDL”
-- 后继版本可能会修改成后者
SET _ENABLE_PARALLEL_DML = 1;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int) PARALLEL = 3;
CREATE TABLE T2 PARALLEL 2 AS SELECT * FROM T1;

4.3.4 优先级

如果同时指定了 PARALLEL HINT、FORCE SESSION PARALLEL、表级 PARALLEL 属性中的两个或多个,那么它们的优先级如下:

PARALLEL HINT 优先级 > FORCE SESSION PARALLEL 优先级 > 表 PARALLEL 属性优先级

4.3.5 旁路导入

CREATE INDEX 语句无论是否开启并行,总是会走旁路导入(Direct Write,Bypass memtable)。

CREATE TABLE AS SELECT 语句目前(v4.2)还不支持旁路导入功能,如果数据量比较大,建议先建立空表,然后使用并行 DML 旁路导入模式并行插入。

4.4 并行 LOAD DATA

LOAD DATA 的实现不是基于并行 DML,它的实现方式是:先用多个线程并行切分 csv 文件,拼成多个 insert 语句,然后用一定的并发度分发执行这些 insert 语句。

LOAD DATA /*+ parallel(2) */ infile "test.csv" INTO TABLE t1 FIELDS TERMINATED BY ',' ENCLOSED BY '"';

上述语句中,PARALLEL 选项指定加载数据的并行度,如果没有指定 PARALLEL HINT,则默认以 PARALLEL 为 4 来并行执行 LOAD DATA。 PARALLEL 建议取值范围是 [0, 租户的最大CPU数]。

4.5 局部无法并行的场景

  • 最顶层 DFO 无需并行,它负责和客户端交互,以及执行最顶层无需并行的部分操作,如 LIMIT、PX COORDINATOR 等
  • 包含 TABLE UDF 时,含该 UDF 的 DFO 只能串行执行,其余部分依然可以并行

5 并行执行控制参数

OceanBase 提供了一组参数来控制并行执行的初始化和调优。在 OceanBase 启动时,可以根据租户的 CPU 数量和租户配置项 px_workers_per_cpu_quota计算出默认并行执行控制参数。用户也可以不使用默认值,在启动时手工指定参数值,还可以根据实际场景在后期手工增加或减小参数值。

并行执行功能默认是开启的。本文从以下几个方面说明并行执行参数控制技巧:

  • 并行执行默认参数
  • 并行执行相关参数调优

5.1 并行执行默认参数

并行执行的参数能够控制并行线程数、并行执行排队等行为。具体总结为下表:

参数名称默认值级别说明
px_workers_per_cpu_quota10租户级配置项每个 CPU 上可以分配的并行执行线程数,取值范围 [1,20]
parallel_servers_targetMIN CPU * px_workers_per_cpu_quota租户级变量表示租户在每个节点上可申请的并行执行线程数量。
parallel_degree_policyMANUAL租户级/Session级变量自动 DOP 开启开关,当设置为 AUTO 时,会开启自动 DOP,优化器根据统计信息自动计算 Query 的并行度。当设置为 MANUAL 时,根据 HINT、TABLE PARALLEL 属性、SESSION 级 DOP 等控制并行度。
_parallel_max_active_sessions0租户级配置项TPCH 测试中,Power Run 需要的并行度比较高, Throughput Run 需要的并行度比较低。但是,TPCH 测试规范不允许通过 SQL 来动态更改并行度。为了达到动态更改并行度的目的,增加一个配置项 _parallel_max_active_sessions (pmas)当 pmas 等于 0 时,并行执行的活跃 session 数不受限制;当 pmas 大于 0 时,当前并行执行最大活跃 session 数会限制在 pmas 值以下,其它并行执行 session 线程会被挂起,等待被唤醒。某个 query 执行成功后,会唤醒它们尝试继续执行。

OceanBase 为了降低用户使用并行执行的门槛,将并行执行参数个数降到了最低,使用默认参数即可实现开箱即用。在特殊场景下,用户可以通过改变默认参数实现应用调优。

px_workers_per_cpu_quota

这个参数表示在每个 CPU 上可以分配的并行执行线程数。当分配给租户的 MIN CPU 为 N 时,如果并行负载均匀,那么每个节点上可以分配的线程数为 N * px_workers_per_cpu_quota。如果并行负载访问的数据分布不均匀,那么某些节点上实际分配的线程数会短时间大于 N * px_workers_per_cpu_quota,当负载结束后,这些多分配出来的线程会被自动回收。

px_workers_per_cpu_quota 对 parallel_servers_target 默认值的影响仅仅发生在创建租户时,租户创建完成后,修改 px_workers_per_cpu_quota 值并不会改变 parallel_servers_target 值。

一般情况下,不需要更改 px_workers_per_cpu_quota 的默认值。当未开启资源隔离特性时,如果发生并行执行占用了全部 CPU 资源的情况,可以尝试通过降低 px_workers_per_cpu_quota 参数的值来缓解该问题。

parallel_servers_target

这个参数表示租户在每个节点上可申请的并行执行线程资源数量。当资源耗尽时,并行执行请求需要排队。关于排队的概念,请参考本文中《3 并发控制与排队》一节。

并行执行的 CPU 使用率非常低,可能是因为 parallel_servers_target 值太小,导致 SQL 的 DOP 被降级,实际分配的线程数小于期望的数量。OBServer v3.2.3 版本之前,parallel_servers_target 的默认值非常小,可以通过调大 parallel_servers_target 值来解决,建议设置为 MIN CPU * 10。 OBServer v3.2.3 版本起,parallel_servers_target 的默认值就是 MIN CPU * 10,一般不会遇到这个问题。

MIN CPU 表示租户的 min_cpu 值,一般在创建租户时指定。

设定好 parallel_servers_target 值后,重新连接,执行下面的命令查看最新值:

show variables like 'parallel_servers_target';

从运维的角度,如何给 parallel_servers_target 设置一个最大值,避免以后频繁调整呢?理论上,可以将 parallel_servers_target 设置得无穷大,这时会面临一个问题:低效。所有的查询都不排队,它们都开始执行,并争抢 CPU 时间片、争抢磁盘 IO、网络 IO。

从吞吐量的角度看,这个问题不算严重。从单个 SQL 延迟的角度看,这种争抢会严重影响延迟。综合考虑 CPU 和 IO 的利用率,我们可以以租户 UNIT 的 MIN CPU 为基准,一般把 parallel_servers_target 设置为 MIN CPU * 10 即可;少数 IO 密集型场景,CPU 可能用不满,可以将 parallel_servers_target 设置为 MIN CPU * 20。

parallel_degree_policy

这个参数是自动 DOP 开启开关,当设置为 AUTO 时,会开启自动 DOP,优化器根据统计信息自动计算 Query 的并行度。当设置为 MANUAL 时,根据 HINT、TABLE PARALLEL 属性、SESSION 级 DOP 等控制并行度。

OBServer v4.2 起,如果用户不熟悉并行度的设置规则,可以设置 parallel_degree_policy 为 AUTO,让优化器帮忙自动选择并行度。关于自动并行度的计算规则,参考本文的 《2 设定并行度》一节。OBServer v4.2 之前的版本,不支持 parallel_degree_policy 开关,不支持自动 DOP 功能,需要手工设置。

5.2 并行执行相关参数调优

ob_sql_work_area_percentage

租户级变量,用于限制 SQL 模块可以使用的最大内存。它是一个百分值,表示占租户总内存的比值,默认为 5,表示占租户内存的 5%。当 SQL 使用的内存超过限制时,会触发内存落盘。sql work area 内存的实际使用量,可以在 observer.log 日志中搜索 WORK_AREA。例如:

[MEMORY] tenant_id=1001 ctx_id=WORK_AREA hold=2,097,152 used=0 limit=157,286,400

读多写少场景下,如果 SQL 模块内存受限导致数据落盘,可以适当增大 ob_sql_work_area_percentage 值。

workarea_size_policy

OceanBase 实现了全局自适应的内存管理,当 workarea_size_policy 设置为 AUTO 时,执行框架会以最优的策略为各个算子(如 HashJoin、GroupBy、Sort 等)分配内存,开启自适应的数据落盘策略。如果 workarea_size_policy 设置为 MANUAL,则用户需要手工设置 _hash_area_size_sort_area_size

_hash_area_size

租户级配置项,用于手动指定每个算子的 Hash 算法最大可用内存,默认值为 128MB。超过指定值后,会启用内存落盘功能。对于 Hash 算法相关算子有效,如 Hash Join、Hash Groupby、Hash Distinct 等。一般情况下,不需要修改本配置项,建议 workarea_size_policy 保持为 AUTO。如果认定 Hash 算法中系统自动内存落盘功能不满足业务需要,可以先将 workarea_size_policy 设置为 MANUAL,然后手工指定 _hash_area_size值。

_sort_area_size

租户级配置项,用于手动指定每个算子的 Sort 算法最大可用内存,默认值为 128MB。超过指定值后,会启用内存落盘功能。主要在 Sort 算子中使用。一般情况下,不需要修改本配置项,建议 workarea_size_policy 保持为 AUTO。如果认定 Sort 算法中系统自动内存落盘功能不满足业务需要,可以先将 workarea_size_policy 设置为 MANUAL,然后手工指定 _sort_area_size值。

_px_shared_hash_join

SESSION 级系统变量,用于决定Hash Join 是否采用共享哈希表方式进行优化。默认值为 true,表示默认开启 shared hash join 算法。Hash Join 在并行场景下,每个并行线程都是独立计算 hash 表。考虑到左表使用 Broadcast 重分布方式时,因为所有并行工作线程计算出的哈希表是相同的,所以每台机器只需要构造一个 hash 表,由所有工作线程共享,这样可以提高 CPU Cache 友好性。一般情况下,不需要修改本配置项。

5.3 并行 DML 相关的参数调优

OBServer v4.1 起,如果没有事务要求,向表中导入数据时建议使用 INSERT INTO SELECT 加上旁路导入功能,一次性将数据插入到新表。这种方式既可以缩短导入时间,也可以避免写入太快导致内存不足的问题。

6 并行执行诊断

诊断并行执行问题,可以从两个大的方面入手。首先从系统整体上判断,比如确认网络、磁盘 IO、CPU 是不是被打满;然后从具体 SQL 着手,找到问题 SQL 在哪里,它的内部状态如何。

6.1 系统诊断

在业务比较繁忙的系统重出现性能问题时,首先需要在系统层面做初步诊断。一般有两种途径:

  • OCP(OceanBase Cloud Platform),支持可视化观测系统性能
  • tsar 等命令行系统工具,支持查询网络、磁盘、CPU等的历史监控数据

tsar 是一个系统监控和性能分析工具,它可以提供关于 CPU、磁盘、网络等方面的详细信息。以下是 tsar 命令的几个常见用法:

tsar --cpu
tsar --io
tsar --traffic

除了以上示例外,tsar 还支持其他选项和参数,例如通过参数-d 2 可以查出两天前到现在的数据,-i 1 表示以每次1分钟作为采集显示。

tsar -n 2 -i 1 --cpu

如果出现磁盘或网络打爆,则优先从硬件容量过小或并发压力过大角度解决问题。

6.2 SQL 诊断

当遇到并行执行问题时,可以从 SQL 层面、并行执行线程层面、算子层面逐层检查。

6.2.1 确认 SQL 还在执行

确认 SQL 在正常执行:关注 TIME 字段,每次查询 GV$OB_PROCESSLIST 视图 TIME 字段都在增长,并且 STATE 为 ACTIVE,说明 Query 还在执行。

确认 SQL 是否在反复重试:如果 SQL 是因为反复重试导致没有返回结果,RETRY_CNT、RETRY_INFO 字段会有相关信息。其中 RETRY_CNT 是表示重试了多少次了,RETRY_INFO 是最后一次重试的原因。没有重试发生的时候,RETRY_CNT 为 0。TOTAL_TIME 字段表示包含每次重试在内的累计执行时间。如果发现 SQL 在反复重试,则根据 RETRY_INFO 中给出的错误码判断是否需要干预。OBServer v4.1 之前,最常见的一个错误是 -4138(OB_SNAPSHOT_DISCARDED),遇到这种情况,按照本文中 《4 并行执行分类》中的 4.2.4 节指示,调大 undo_retention 值即可解决。对于其它错误,如 -4038(OB_NOT_MASTER)等,无需任何处理,一般可以自动重试成功。如果重试次数总是大于1,并且确认系统整体状态平稳,可以联系 OceanBase 研发做进一步判断。

-- MySQL 模式
SELECT
  TENANT,INFO,TRACE_ID,STATE,TIME,TOTAL_TIME,RETRY_CNT,RETRY_INFO
FROM
  oceanbase.GV$OB_PROCESSLIST;
  

如果发现 GV$OB_PROCESSLIST 里还有对应的 SQL,但状态被标记为 SESSION_KILLED,并且一直没有退出,那么需要联系 OceanBase 研发,报告 bug。这可能是因为:

  • 有逻辑没有正确检测 SESSION KILLED 状态,未能及时退出执行流程

6.2.2 确认 SQL 还在执行并行查询

OBServer 集群中,所有活跃的并行执行线程都可以通过 GV$OB_PX_WORKER_STAT视图查看到。

-- MySQL 模式
OceanBase(admin@oceanbase)>select * from oceanbase.GV$OB_PX_WORKER_STAT;
SESSION_ID: 3221520411
 TENANT_ID: 1002
    SVR_IP: 192.168.0.1
  SVR_PORT: 19510
  TRACE_ID: Y4C360B9E1F4D-0005F9A76E9E66B2-0-0
     QC_ID: 1
    SQC_ID: 0
 WORKER_ID: 0
    DFO_ID: 0
START_TIME: 2023-04-23 17:29:17.372461

-- Oracle 模式
OceanBase(root@SYS)>select * from SYS.GV$OB_PX_WORKER_STAT;
SESSION_ID: 3221520410
 TENANT_ID: 1004
    SVR_IP: 192.168.0.1
  SVR_PORT: 19510
  TRACE_ID: Y4C360B9E1F4D-0005F9A76E9E66B1-0-0
     QC_ID: 1
    SQC_ID: 0
 WORKER_ID: 0
    DFO_ID: 0
START_TIME: 2023-04-23 17:29:15.372461

结合从 GV$OB_PROCESSLIST 拿到的 TRACE_ID,通过这个视图可以看到 SQL 当前正在执行哪些 DFO,执行了多久等信息。

如果这个视图里什么也查不到,但 GV$OB_PROCESSLIST 里依然可以看到相应 SQL,可能的原因包括:

  • 所有 DFO 均已执行完成,结果集较大,当前正处在向客户端吐数据阶段
  • 除了最顶层 DFO 外,其余所有 DFO 均已执行完成

6.2.3 确认每个算子的执行状况

通过 oceanbase.GV$SQL_PLAN_MONITOR (MySQL) 或 SYS.GV$SQL_PLAN_MONITOR(Oracle)可以查看每个并行工作线程中每个算子的执行状态。从 OBServer v4.1 起,GV$SQL_PLAN_MONITOR包含两部分数据:

  • 已经执行完成的算子。所谓执行完成是指这个算子已经调用过 close 接口,在当前线程中不再处理任何数据。
  • 正在执行的算子。所谓正在执行是指这个算子还没有调用 close 接口,正在处理数据过程中。读取这部分算子的数据,需要在查询 GV$SQL_PLAN_MONITOR 视图的 where 条件中指定 request_id < 0。在使用 request_id < 0 条件查询本视图时,我们也称为访问 “Realtime SQL PLAN MONITOR”。本访问接口未来可能会变化。

OBServer 4.1 之前,仅支持查看已经执行完成的算子状态。

GV$SQL_PLAN_MONITOR中有几个重要的域:

  • TRACE_ID:它唯一标识了一条 SQL
  • PLAN_LINE_ID:算子在执行计划中的编号,对应于通过 explain 语句查看到的编号
  • PLAN_OPERATION:算子名称,如 TABLE SCAN、HASH JOIN
  • OUTPUT_ROWS:当前算子已经输出的行数
  • FIRST_CHANGE_TIME:算子吐出首行数据时间
  • LAST_CHANGE_TIME:算子吐出最后一行数据时间
  • FIRST_REFRESH_TIME:算子开始监控时间
  • LAST_REFRESH_TIME:算子结束监控时间

根据上面几个域,基本就能刻画出一个算子处理数据的主要动作了。举例几个场景:

  1. 查看一个已经执行完成的 SQL,每个算子使用了多少个线程来执行:
SELECT PLAN_LINE_ID, PLAN_OPERATION, COUNT(*) THREADS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'YA1E824573385-00053C8A6AB28111-0-0'
GROUP BY PLAN_LINE_ID, PLAN_OPERATION
ORDER BY PLAN_LINE_ID;

+--------------+------------------------+---------+
| PLAN_LINE_ID | PLAN_OPERATION         | THREADS |
+--------------+------------------------+---------+
|            0 | PHY_PX_FIFO_COORD      |       1 |
|            1 | PHY_PX_REDUCE_TRANSMIT |       2 |
|            2 | PHY_GRANULE_ITERATOR   |       2 |
|            3 | PHY_TABLE_SCAN         |       2 |
+--------------+------------------------+---------+
4 rows in set (0.104 sec)
  1. 查看正在执行的 SQL,当前正在执行哪些算子,使用了多少线程,已经吐出了多少行:
SELECT PLAN_LINE_ID, CONCAT(LPAD('', PLAN_DEPTH, ' '), PLAN_OPERATION) OPERATOR, COUNT(*) THREADS, SUM(OUTPUT_ROWS) ROWS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'YA1E824573385-00053C8A6AB28111-0-0' AND REQUEST_ID < 0
GROUP BY PLAN_LINE_ID, PLAN_OPERATION, PLAN_DEPTH
ORDER BY PLAN_LINE_ID;
  1. 查看一个已经执行完成的 SQL,每个算子处理了多少行数据,吐出了多少行数据:
SELECT PLAN_LINE_ID, CONCAT(LPAD('', PLAN_DEPTH, ' '), PLAN_OPERATION) OPERATOR, SUM(OUTPUT_ROWS) ROWS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'Y4C360B9E1F4D-0005F9A76E9E6193-0-0'
GROUP BY PLAN_LINE_ID, PLAN_OPERATION, PLAN_DEPTH
ORDER BY PLAN_LINE_ID;
+--------------+-----------------------------------+------+
| PLAN_LINE_ID | OPERATOR                          | ROWS |
+--------------+-----------------------------------+------+
|            0 | PHY_PX_MERGE_SORT_COORD           |    2 |
|            1 |  PHY_PX_REDUCE_TRANSMIT           |    2 |
|            2 |   PHY_SORT                        |    2 |
|            3 |    PHY_HASH_GROUP_BY              |    2 |
|            4 |     PHY_PX_FIFO_RECEIVE           |    2 |
|            5 |      PHY_PX_DIST_TRANSMIT         |    2 |
|            6 |       PHY_HASH_GROUP_BY           |    2 |
|            7 |        PHY_HASH_JOIN              | 2002 |
|            8 |         PHY_HASH_JOIN             | 2002 |
|            9 |          PHY_JOIN_FILTER          | 8192 |
|           10 |           PHY_PX_FIFO_RECEIVE     | 8192 |
|           11 |            PHY_PX_REPART_TRANSMIT | 8192 |
|           12 |             PHY_GRANULE_ITERATOR  | 8192 |
|           13 |              PHY_TABLE_SCAN       | 8192 |
|           14 |          PHY_GRANULE_ITERATOR     | 8192 |
|           15 |           PHY_TABLE_SCAN          | 8192 |
|           16 |         PHY_GRANULE_ITERATOR      | 8192 |
|           17 |          PHY_TABLE_SCAN           | 8192 |
+--------------+-----------------------------------+------+
18 rows in set (0.107 sec)

为了展示美观,上面使用了一个域 PLAN_DEPTH来做缩进处理,PLAN_DEPTH 表示这个算子在算子树中的深度。

注:

  1. 尚未调度的 DFO 的算子信息,不会出现在 GV$SQL_PLAN_MONITOR 中。
  2. 在一个 PL 中如果包含多条 SQL,它们的 TRACE_ID 相同

7 并行执行调优技巧

本章介绍一些基础的 OceanBase 并行执行调优技巧。调优是一个永无止境的话题,本章内容也会与时俱进,不断更新。

7.1 手动收集统计信息

如果优化器中保存的统计信息陈旧,可能导致生成的计划不优。OBServer v3.2 和 OBServer v4.1 分别提供了手动收集统计信息的接口:OceanBase 优化器统计信息 (4.x 版本)

OBServer v4.1 手动收集主表、索引表的语法如下:

-- 收集用户TEST的表T1的全局级别的统计信息,所有列的桶个数设定为auto策略:
call dbms_stats.gather_table_stats('TEST', 'T1', granularity=>'GLOBAL', method_opt=>'FOR ALL COLUMNS SIZE AUTO');
-- 收集用户TEST下表T1的索引IDX的索引统计信息,并行度4(IDX不唯一,需指定表名称)
call dbms_stats.gather_index_stats('TEST', 'IDX', degree=>4, tabname=>'T1');

7.2 修改分区方式使用 Partition Wise Join

PoC 场景中,如果有大表 JOIN,并且在业务允许的前提下,可以让大表使用相同的分区方式,并且将他们绑定到同一个表组上,这样可以实现性能最佳的 partition wise join。使用 partition wise join 时,并行度也要调整得和分区数相适应,这样可以获得最佳性能。

7.3 并行度与分区数适配

一般来说,并行度与分区数符合一定的整比例关系,能得到较好的性能。详细论述参考本文中《1.6 通过均衡负载来优化性能》一节。

7.4 创建索引

创建合适的索引,能减少数据的扫描量,可以提高并行执行性能。在哪些表、哪些列上建索引,没有一个通用的方案,需要基于具体 SQL 具体分析。

7.5 创建复制表

OBServer v4.2 及之后版本,通过创建复制表,能减少数据重分布,可以提高并行执行性能,详见 OceanBase 官方文档中创建表的《创建复制表》章节。基本语法举例如下:

create table dup_t1(c1 int) duplicate_scope = 'cluster';

广告时间

如果您从这篇文章的开头一直看到这里,说明您对 OceanBase 的 SQL 执行引擎和并行执行技术很感兴趣。

如果您希望进一步了解 OceanBase 中 SQL 执行引擎的设计与实现,欢迎阅读这两篇技术博客:《OceanBase 执行引擎的自适应技术》和《OceanBase 分布式下压技术》。

点赞

收藏1 并行执行概念

并行执行是指将一个 SQL 查询任务分成多个子任务,并允许这些子任务在多个处理器上同时运行,以提高整个查询任务的执行效率。在现代计算机系统中,多核处理器、多线程和高速网络连接广泛应用,这使得并行执行成为了一种可行的高效率查询技术。

并行执行能够极大降低计算密集型大查询的响应时间,被广泛应用在离线数据仓库、实时报表、在线大数据分析等业务场景,同时还应用在批量导数,快速构建索引表等领域。

以下场景会从并行执行中获益:

  • 大表扫描,大表连接,大数据量排序,聚合
  • 大表 DDL 操作,如修改主键、改变列类型,建索引等
  • 从已有大数据建表(Create Table As Select)
  • 批量插入、删除、更新

本节包含如下内容:

  • 什么场景适用并行执行
  • 什么场景不适用并行执行
  • 硬件要求
  • 并行执行工作原理
  • 并行执行工作线程
  • 通过均衡负载来优化性能

1.1 什么场景适用并行执行

并行执行通过充分利用多个 CPU 和 IO 资源,以达到降低 SQL 执行时间的目的。

当满足下列条件时,使用并行执行会优于串行执行:

  • 访问的数据量大
  • SQL 并发低
  • 要求低延迟
  • 有充足的硬件资源

并行执行用多个处理器协同并发处理同一任务,在这样的系统中会有收益:

  • 多处理器系统(SMPs)、集群
  • IO 带宽足够
  • 内存富余(可用于处理内存密集型操作,如排序、建 hash 表等)
  • 系统负载不高,或有峰谷特征(如系统负载一般在 30% 以下)

如果你的系统不满足上述特征,那么并行执行可能不会带来显著收益。在一些高负载,小内存,或 IO 能力弱的系统里,并行执行甚至会带来负面效果。

并行执行不仅适用于离线数据仓库、实时报表、在线大数据分析等分析型系统,而且在 OLTP 领域也能发挥作用,可用于加速 DDL 操作、以及数据跑批工作等。但是,对于 OLTP 系统中的普通 SELECT 和 DML 语句,并行执行并不适用。

1.2 什么场景不适用并行执行

串行执行使用单个线程来执行数据库操作,在下面这些场景下使用串行执行优于并行执行:

  • Query 访问的数据量很小
  • 高并发
  • Query 执行时间小于 100 毫秒

并行执行一般不适用于如下场景:

  • 系统中的典型 SQL 执行时间都在毫秒级。并行查询本身有毫秒级的调度开销,对于短查询来说,并行执行带来的收益完全会被调度开销所抵消。
  • 系统负载本就很高。并行执行的设计目标就是去充分利用系统的空余资源,如果系统本身已经没有空余资源,那么并行执行并不能带来额外收益,相反还会影响系统整体性能。

1.3 硬件要求

并行执行对硬件没有特殊要求。需要注意的是,CPU 核数、内存大小、存储 I/O 性能、网络带宽都会影响并行执行性能,其中任意一项成为瓶颈都会拖累并行执行性能。

1.4 并行执行工作原理

并行执行将一个 SQL 查询任务分解成多个子任务,并调度这些子任务到多个处理器上运行。

本节包含如下内容:

  • SQL 语句的并行执行
  • 生产者消费者流水线模型
  • 并行的粒度
  • 生产者和消费者之间的数据分发方式
  • 生产者和消费者之间的数据传输机制

1.4.1 SQL 语句的并行执行

当一个 SQL 被解析为并行执行计划后,会按照下面的步骤执行:

  1. SQL 主线程(接收、解析SQL的线程)根据计划形态预约并行执行需要的线程资源。这些线程资源可能来自集群中的多台机器。
  2. SQL 主线程打开并行调度算子(PX COORDINATOR)。
  3. 并行调度算子解析计划,将它们切分成多个操作步骤,按照自底向上的顺序调度执行这些操作。每个操作都会尽可能并行执行。
  4. 当所有操作并行执行完成后,并行调度算子会接收计算结果,并将结果吐给它的上层算子(如 Aggregate 算子),串行完成剩余不可并行的计算(如最终的 SUM 计算)。

1.4.2 生产者-消费者流水线模型

并行执行使用生产者-消费者模型来进行流水执行。并行调度算子解析计划,将它们切分成多个操作步骤,每个操作步骤称之为一个 DFO(Data Flow Operation)

一般情况下,并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来,这称为 DFO 间的并行执行。每个 DFO 会使用一组线程来执行,这称为 DFO 内的并行执行,这个 DFO 使用的线程数称为 DOP(Degree Of Parallisim)

上一阶段的消费者 DFO 会成为下一阶段的生产者 DFO。在并行调度算子的协调下,会同时启动消费者 DFO 和生产者 DFO。

下图中:

(1)DFO A 生成的数据会立即传输给DFO B 进行计算;

(2)DFO B 完成计算后,会将数据暂存在当前线程中,等待它的上层 DFO C 启动;

(3)当 DFO B 收到 DFO C 启动完成通知后,会将自己的角色转变成生产者,开始向 DFO C 传输数据,DFO C 收到数据后开始计算。

考虑下面的查询:

create table game (round int primary key, team varchar(10), score int)
    partition by hash(round) partitions 3;

insert into game values (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
insert into game values (4, "CN", 4), (5, "US", 4), (6, "JP", 4);

select /*+ parallel(3) */ team, sum(score) total from game group by team;

查询语句对应的执行计划:

OceanBase(admin@test)>explain select /*+ parallel(3) */ team, sum(score) total from game group by team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan                                                                                              |
+---------------------------------------------------------------------------------------------------------+
| =================================================================                                       |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                                       |
| -----------------------------------------------------------------                                       |
| |0 |PX COORDINATOR               |        |1       |4           |                                       |
| |1 | EXCHANGE OUT DISTR          |:EX10001|1       |4           |                                       |
| |2 |  HASH GROUP BY              |        |1       |4           |                                       |
| |3 |   EXCHANGE IN DISTR         |        |3       |3           |                                       |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|3       |3           |                                       |
| |5 |     HASH GROUP BY           |        |3       |2           |                                       |
| |6 |      PX BLOCK ITERATOR      |        |1       |2           |                                       |
| |7 |       TABLE SCAN            |game    |1       |2           |                                       |
| =================================================================                                       |
| Outputs & filters:                                                                                      |
| -------------------------------------                                                                   |
|   0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|       dop=3                                                                                             |
|   2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256                  |
|       group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))])                                  |
|   3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|   4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       (#keys=1, [game.team]), dop=3                                                                     |
|   5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       group([game.team]), agg_func([T_FUN_SUM(game.score)])                                             |
|   6 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|   7 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|       access([game.team], [game.score]), partitions(p[0-2])                                             |
|       is_index_back=false, is_global_index=false,                                                       |
|       range_key([game.round]), range(MIN ; MAX)always true                                              |
+---------------------------------------------------------------------------------------------------------+
29 rows in set (0.003 sec)

select语句的执行计划会首先对 game 表做全表扫描,按照 team 做分组求和,然后算出每个 team 的总分数。这个查询的执行示意图如下:

从图中可以看到,这个查询实际使用了 6 个线程。

  • 第一步:前三个线程负责 game 表扫描,并且在每个线程内对 game.team 数据做预聚合;
  • 第二步:后三个线程负责对预聚合的数据做最终聚合;
  • 第三步:最终聚合结果发给并行调度器,由它返回给客户端。

第一步的数据发给第二步时,需要用 game.team 字段做 hash,决定将预聚合数据发给哪个线程。

1.4.3 并行的粒度

并行数据扫描的基本工作单元称为 granule。OceanBase 将表扫描工作划分成多个 granule,每个 granule 描述了一段扫描任务的范围。因为 granule 不会跨表分区 ,所以每段扫描任务一定是位于一个分区内。

根据 granule 的粒度特征,可以划分为两类:

  • Partition Granule

Partition granule 描述的范围是一整个分区,扫描任务涉及到多少个分区,就会划分出多少个 partition granule。这里的分区既可以是主表分区,也可以是索引分区。

Partition Granule 最常用的应用场景是 partition wise join,两张表里对应的分区通过 partition granule 可以确保被同一个工作线程处理。

  • Block Granule

Block granule 描述的范围是一个分区中的一段连续数据。数据扫描场景里,一般都是使用 block granule 划分数据。每个分区都会被划分为若干个 block,这些 block 再以一定的规则串联起来形成一个任务队列,供并行工作线程消费。

在给定并行度的情况下,为了确保扫描任务的均衡,优化器会自动选择将数据划分成分区粒度(Partition Granule)或块粒度(Block Granule)。如果选择了 Block Granule,并行执行框架会在运行时决策 block 的划分,总体原则是确保一个 block 既不会太大,也不会太小。太大,可能导致数据倾斜,让部分线程少干活;太小,会导致频繁的扫描切换开销。

划分好分区粒度后,每个粒度对应一个扫描任务。Table Scan 扫描算子会一个接一个地处理这些扫描任务,处理完一个之后,接着处理下一个,直到所有任务处理完毕。

1.4.4 生产者和消费者之间的数据分发方式

数据分发方式指的是,数据从一组并行执行工作线程(生产者)发送给另一组(消费者)时使用的方法。优化器会使用一系列的优化策略,决策使用哪种数据重分布方式,以达到最优的性能。

并行执行中的常见数据分发方式包括:

  • Hash Distribution

使用 Hash distribution 发送数据时,生产者根据 distribution key 对数据行计算 hash 值并取模,算出发给哪个消费者工作线程。大部分情况下,使用 hash distribution 能将数据较为均匀的分发给多个消费者线程。

  • Pkey Distribution

使用 Pkey Distribution 时,生产者计算出数据行对应的目标表所在分区,然后将行数据发给处理这个分区的消费者线程。

Pkey Distribution 常用于 partitial partitions wise join 场景。在 partitial partitions wise join 场景下,消费者侧的数据不需要做重分布,就可以和生产者侧的数据做 Partition Wise Join。这种方式可以减少网络通信量,提升性能。

  • Pkey Hash Distribution

使用 Pkey Hash Distribution 时,生产者首先需要计算出数据行对应的目标表所在的分区。然后,根据 distribution key 对数据行进行 hash 计算,以便决定将其发给哪一个消费者线程来处理。

Pkey Hash Distribution 常常应用于 Parallel DML 场景中。在这种场景下,一个分区可以被多个线程并发更新,因此需要使用 Pkey Hash Distribution 来确保相同值的数据行被同一个线程处理,不同值的数据行尽可能均分到多个线程处理。

  • Broadcast Distribution

使用 broadcast distribution 时,生产者将每一个数据行发送消费者端的每一个线程,使得消费者端每一个线程都拥有全量的生产者端数据。

Broadcast distribution 常用于将小表数据复制到所有执行 join 的节点,然后做本地 join 操作。这种方式可以减少网络通信量。

  • Broadcast to Host Distribution(简称 BC2HOST)

使用 broadcast to host distribution 时,生产者将每一个数据行发送消费者端的每一个节点上,使得消费者端每一个节点都拥有全量的生产者端数据。然后,节点里的消费者线程协同处理这份数据。

Broadcast to host distribution 常用于 NESTED LOOP JOIN、SHARED HASH JOIN 场景。NESTED LOOP JOIN 场景里,消费端的每个线程会从共享数据里取一部分数据行作为驱动数据,去目标表里做 join 操作;SHARED HASH JOIN 场景里,消费端的每个线程会基于共享数据协同构建 hash 表,避免每个线程独立构建相同 hash 表导致的不必要开销。

  • Range Distribution

使用 Range distribution 时,生产者将数据按照 range 范围做划分,让不同消费者线程处理不同范围的数据。

Range distribution 常用于排序场景,各个消费者线程只需排序好发给自己的数据,数据就能在全局范围内有序。

  • Random Distribution

使用 Random distribution 时,生产者将数据随机打散,发给消费者线程,使得每个消费者线程处理的数据数量几乎一致,从而达到均衡负载的目的。

Random distribution 常用于多线程并行 UNION ALL 场景,该场景只要求数据打散,负载均衡,数据之间无其它关联约束。

  • Hybrid Hash Distribution

1.4.5 生产者和消费者之间的数据传输机制

并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来并行执行。为了方便在生产者和消费者之间传输数据,需要创建一个传输网络。

例如,生产者 DFO 以 DOP = 2 来做数据扫描,消费者 DFO 以 DOP = 3 来做数据聚合计算,每个生产者线程都会创建 3 个虚拟链接去连接消费者线程,总计会创建 6 个虚拟链接。如下图:

生产者和消费者之间创建的虚拟传输网络被称为数据传输层(Data Transfer Layer,简称 DTL)。OceanBase 并行执行框架中,所有的控制消息和行数据都通过 DTL 进行收发。每个工作线程可以对外建立数千个虚拟链接,具有高度的可扩展性。除此之外,DTL 还具有数据缓冲、批量数据发送和自动流量控制等能力。

当 DTL 链接的两端位于同一个节点时,DTL 会通过内存拷贝的方式来传递消息;当 DTL 链接的两端位于不同节点时,DTL 会通过网络通信的方式来传递消息。

1.5 并行执行工作线程

一个并行查询会使用两类线程:1个主线程,若干个并行工作线程。其中主线程和普通的 TP 查询使用的线程没有任何区别,来自普通工作线程池,并行工作线程来自专用线程池。

OceanBase 使用专用线程池模型来分配并行工作线程。每个租户在其所属的各个节点里都有一个租户专属的并行执行线程池,并行查询工作线程都通过这个线程池来分配。

并行调度算子在调度每个 DFO 之前,会去线程池中申请线程资源。当 DFO 执行完成时,会立即源释放线程资源。

线程池的初始大小为 0,按需增长,没有上限。为了避免空闲线程数过多,线程池引入自动回收机制。对于任意线程:

  • 如果空闲时间超过 10 分钟,并且线程池中剩余线程数大于 8 个,则被回收销毁;
  • 如果空闲时间超过 60 分钟,则无条件销毁

虽然线程池的大小没有上限,但是通过下面两个机制,能在绝大多数场景里形成事实上限:

  1. 并行执行开始执行前,需要通过 Admission 模块预约线程资源,预约成功后才能投入执行。通过这个机制,能限制并发查询数量。Admission 模块的详细内容参考本文中 《3 并发控制与排队》一节。
  2. 查询每次从线程池申请线程时,单次申请的线程数量不会超过 N, N 等于租户 UNIT 的 MIN_CPU 乘以 px_workers_per_cpu_quota,如果超过,则最多只分配 N 个线程。px_workers_per_cpu_quota 是租户级配置项,默认值为 10。例如,一个 DFO 的 DOP = 100,它需要从 A 节点申请 30 个线程,从 B 节点申请 70 个线程,UNIT 的 MIN_CPU = 4,px_workers_per_cpu_quota = 10,那么 N = 4 * 10 = 40。最终这个 DFO 在 A 节点上实际申请到 30 个线程,B 节点上实际申请到 40 个线程,它的实际 DOP 为 70。

1.6 通过均衡负载来优化性能

为了达到最优性能,所有工作线程分到的工作任务应该尽量相等。

SQL 使用 Block Granule 划分任务的时候,工作任务会动态地分配到工作线程之间。这样可以最小化工作负载不均衡问题,即一些工作线程的工作量不会明显超过其它工作线程。SQL 使用 Partition Granule 划分任务时,可以通过让任务数是工作线程数的整数倍来优化性能。这适用于 Partition Wise Join 和并行 DML 场景。

举个例子,假设一个表有 16 个分区,每个分区的数据量差不多。你可以使用 16 工作线程(DOP 等于 16)以大约十六分之一的时间完成工作,你也可以使用五个工作线程以五分之一的时间完成工作,或使用两个线程以一半的时间完成工作。

但是,如果你使用 15 个线程来处理 16 个分区,则第一个线程完成一个分区的工作后,就开始处理第 16 个分区。而其他线程完成工作后,它们变为空闲状态。当每个分区的数据量差不多时,这种配置会导致性能不优;当每个分区的数据量有所差异时,实际性能则会因情况而异。

类似地,假设你使用 6 个线程来处理 16 个分区,每个分区的数据量差不多。在这种情况下,每个线程在完成其第一个分区后,会处理第二个分区,但只有四个线程会处理第三个分区,而其他两个线程会保持空闲。

一般来说,你不能假设在给定数量的分区(N)和给定数量的工作线程(P)上执行并行操作所花费的时间等于 N 除以 P。这个公式没有考虑到一些线程可能需要等待其他线程完成最后的分区。但是,通过选择适当的 DOP,你可以最小化工作负载不均衡问题并优化性能。


2 设定并行度

并行度(degree of parallelism,简称DOP)指的是单个 DFO 在执行时使用的工作线程数。并行执行的设计目的就是为了高效利用多核资源。OceanBase 并行执行框架提供了多种方式指定并行度,既可以手工指定,也可以让数据库帮你自动选择。本章从以下几个方面介绍并行度相关内容:

  • 手工指定并行度
  • 自动并行度
  • 可变并行度

2.1 手工指定并行度

可以对一张表指定并行度,使得对这张表的扫描总是使用并行执行。

2.1.1 指定并行度的几种方式

表属性指定并行度

下面的语句分别指定一张主表和一个索引的扫描并行度。

ALTER TABLE table_name PARALLEL 4;
ALTER TABLE table_name ALTER INDEX index_name PARALLEL 2;

如果一个 SQL 中只涉及一张表,当 SQL 中查询了主表时,除了主表所在的 DFO,其余 DFO 也会使用并行度 4 来执行;当 SQL 查询了索引表时,除了索引表所在的 DFO,其余 DFO 也会使用并行度 2 来执行。

如果一个 SQL 中涉及多张表,则会使用 PARALLEL 最大值作为整个计划的 DOP。

PARALLEL HINT 指定并行度

可以通过全局 PARALLEL HINT 来指定整个 SQL 的并行度,也可以通过表级的 PARALLEL HINT 来指定特定表的并行度。当一个 SQL 里给定了多个表的 PARALLEL HINT,那么各个表所在 DFO 的 DOP 由各个表的 Parallel 值决定。如果一个 DFO 里包含多张表,则会取他们的 PARALLEL 最大值作为 DFO 的 DOP。

OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(3) */ sum(c1) from t1;
+----------------------------------------------------------------------+
| Query Plan                                                           |
+----------------------------------------------------------------------+
| =========================================================            |
| |ID|OPERATOR             |NAME    |EST.ROWS|EST.TIME(us)|            |
| ---------------------------------------------------------            |
| |0 |SCALAR GROUP BY      |        |1       |2           |            |
| |1 | PX COORDINATOR      |        |3       |2           |            |
| |2 |  EXCHANGE OUT DISTR |:EX10000|3       |2           |            |
| |3 |   MERGE GROUP BY    |        |3       |1           |            |
| |4 |    PX BLOCK ITERATOR|        |1       |1           |            |
| |5 |     TABLE SCAN      |T1      |1       |1           |            |
| =========================================================            |
| Outputs & filters:                                                   |
| -------------------------------------                                |
|   0 - output([T_FUN_SUM(T_FUN_SUM(T1.C1))]), filter(nil), rowset=256 |
|       group(nil), agg_func([T_FUN_SUM(T_FUN_SUM(T1.C1))])            |
|   1 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|   2 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       dop=3                                                          |
|   3 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       group(nil), agg_func([T_FUN_SUM(T1.C1)])                       |
|   4 - output([T1.C1]), filter(nil), rowset=256                       |
|   5 - output([T1.C1]), filter(nil), rowset=256                       |
|       access([T1.C1]), partitions(p0)                                |
|       is_index_back=false, is_global_index=false,                    |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true    |
+----------------------------------------------------------------------+
24 rows in set (0.002 sec)
OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>create table t2 (c1 int, c2 int);
Query OK, 0 rows affected (0.150 sec)

OceanBase(TEST@TEST)>select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
Empty set (0.041 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
+----------------------------------------------------------------------------------------+
| Query Plan                                                                             |
+----------------------------------------------------------------------------------------+
| =================================================================                      |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                      |
| -----------------------------------------------------------------                      |
| |0 |PX COORDINATOR               |        |1       |7           |                      |
| |1 | EXCHANGE OUT DISTR          |:EX10002|1       |6           |                      |
| |2 |  HASH JOIN                  |        |1       |5           |                      |
| |3 |   EXCHANGE IN DISTR         |        |1       |2           |                      |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|1       |2           |                      |
| |5 |     PX BLOCK ITERATOR       |        |1       |1           |                      |
| |6 |      TABLE SCAN             |T1      |1       |1           |                      |
| |7 |   EXCHANGE IN DISTR         |        |1       |4           |                      |
| |8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|1       |4           |                      |
| |9 |     PX PARTITION ITERATOR   |        |1       |2           |                      |
| |10|      TABLE SCAN             |T2      |1       |2           |                      |
| =================================================================                      |
| Outputs & filters:                                                                     |
| -------------------------------------                                                  |
|   0 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|       dop=3                                                                            |
|   2 - output([T1.C2], [T2.C1], [T1.C1], [T2.C2]), filter(nil), rowset=256              |
|       equal_conds([T1.C2 = T2.C1]), other_conds(nil)                                   |
|   3 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   4 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       (#keys=1, [T1.C2]), dop=3                                                        |
|   5 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   6 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       access([T1.C2], [T1.C1]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true                      |
|   7 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|   8 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       (#keys=1, [T2.C1]), dop=1                                                        |
|   9 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       force partition granule                                                          |
|  10 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       access([T2.C1], [T2.C2]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T2.__pk_increment]), range(MIN ; MAX)always true                      |
+----------------------------------------------------------------------------------------+
39 rows in set (0.002 sec)

对于 DML 语句,使用上面的 HINT,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要还添加一个 HINT ENABLE_PARALLEL_DML,例如:

insert /*+ parallel(3) enable_parallel_dml */ into t3 select * from t1;

注意:并行 DML 必须指定全局 parallel hint。仅指定表级 parallel hint 无法让写入部分串行。例如,下面的 SQL 不会开启 DML 并行:

insert /*+ parallel(t3 3) enable_parallel_dml */ into t3 select * from t1;

SESSION 上指定并行度

在当前 session 上指定并行度后,session 上的所有查询语句都将以这个并行度执行。需要注意,即使是单行查询的 SQL,也会使用该并行度,可能导致性能下降。

set _force_parallel_query_dop = 3

对于 DML 语句,使用上面的命令,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要用下面的命令:

set _force_parallel_dml_dop = 3

2.1.2 并行度优先级

全局 HINT > TABLE HINT > SESSION 并行度 > TABLE DOP

对比全局 HINT 和 TABLE HINT 的优先级。可以看到,有全局 HINT 时,TABLE HINT 不生效。

OceanBase(TEST@TEST)>explain select /*+ parallel(2) parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |2           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=2                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table hint 的优先级。可以看到,使用 table 级的 HINT 时,session 设定的 DOP 不生效。

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.001 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table parallel 的优先级。可以看到 table parallel 属性的优先级低于 session 指定 dop 的优先级。

OceanBase(TEST@TEST)>alter table t1 parallel 5;
Query OK, 0 rows affected (0.135 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=5                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.000 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

2.1.3 并行度取值

先讲一个简单原则:1. parallel 的设定目的是为了把 CPU 用满。2. parallel 不是越大越好。举个例子:当租户 CPU 为 20 的时:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 理论值为 20。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 理论值为 10。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 理论值为 7 左右。

对上面的例子做解释

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。
  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。
  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

但是,以上只是一般原则,实际操作中还是应该做微调。解释如下:

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。

假设这个 DFO 大部分时间是做 IO,那么不妨把并发度设置得比 20 稍高一些,比如 25,这样可以充分利用 CPU

  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。

我们实际并不能保证每个 DFO 都能把分给他的 CPU 压满。所以,把并发度适当稍微调整高一些,比如 15 也许能更充分地利用 CPU。 ** 但是,不能无限往上加。把并发度加到 50,根本不会有收益,相反还会增加线程调度开销、框架调度开销。

  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

一个计划里,并不是处处都要启动 3 个 DFO,3 DFO 的情况可能是局部的。大部分时候可能还是同时启动 2 个 DFO。 所以这时候把并发设置为 10 也许比 7 要好。

微调后,当租户 CPU 为 20 的时,一个可能的情况如下:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 实践值为 30。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 实践值为 15。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 实践值为 10 ~ 15 左右。


3 并发控制与排队

在一定场景下,并行查询会因为等待线程资源而排队。

3.1 并行执行并发控制

我们通过租户级变量 PARALLEL_SERVERS_TARGET 指定租户在每个节点上最多可提供的并行执行工作线程数。在并行查询开始之前,从所有相关 observer 预约工作线程资源,只要其中任何一个 observer 不能为并行查询提供足够资源,就不会投入执行。该并行查询会被丢回查询队列排队,等待下次执行时重新尝试获取线程资源,直到能获取到足够工作线程资源才能获准执行。整个查询执行完成后,立即释放预约的工作线程资源。

这种“尝试预约工作线程资源-资源不足丢回查询队列-再次获得执行机会-再次尝试预约工作线程资源”的过程我们称之为并行查询排队。管理全部 observer 工作线程资源预约的模块称为并行执行资源管理器。

并行执行资源管理器为了计算每个并行查询需要的工作线程数,会将查询计划做 DFO 划分,模拟调度 DFO 过程,根据 parallel hint、table parallel 等参数计算出该查询在每个 observer 上需要的最大线程数。这组线程数我们称之为“资源向量”。

资源向量是逻辑概念,用于控制并发与排队。使用资源向量从并行执行资源管理器中预约到足够工作线程资源后,并行查询会投入执行。在执行过程中,尽管随着不同 DFO 的调度执行,会不断有物理线程的获取和释放,但是逻辑上的线程资源并不会归还给并行执行资源管理器。只有在并行查询完全执行完成后,这组资源向量才会归还给并行执行资源管理器。

当大量并发查询从并行执行资源管理器预约线程资源时,采取先来先服务的策略,直至资源分配殆尽,无法满足任何一个查询的资源需求为止。之后的查询都会丢回查询队列排队,再次调度时重试获取资源。

3.2 并行执行工作线程分配

在租户的每个 observer 上都有一个并行执行线程池,用于执行并行查询任务。执行任务时,如果线程池里线程数量不足,会动态扩容线程池。如果线程池里的线程空闲时间超过 10 分钟,会触发自动缩容到 10 个线程;如果线程池里的线程空闲时间超过 60 分钟,会触发进一步缩容,可能缩容到 0 个线程。

并行查询一旦获得调度执行后,每个 DFO 总是可以在它涉及到的 observer 的并行执行线程池里获得需要的并行线程资源。需要注意的是,默认情况下,每个 DFO 在一个 observer 上分配的线程数,不得大于租户 MIN CPU * 10,如果它提出的资源需求大于这个值,会被自动降低为 MIN CPU * 10。

3.3 两级资源控制模型

对于任意并行查询,它会经历两级资源控制:

  • 全局控制:在执行资源管理器的控制下,预约包含足够执行线程的资源向量
  • 局部控制:在并行执行线程池的控制下,分配期望的物理线程数

全局控制会考虑分布式场景下的资源获取,局部控制仅考虑单机线程池的资源分配,二者各司其职。前者确保Query 通过检查后一定能够执行下去,不会在运行时遇到拿不到资源的问题,后者确保极端情况下单个 Query 的 DFO 不会申请远大于能有效利用的物理线程数,造成线程资源浪费。一个并行查询,只要通过了全局控制阶段,就可以顺利执行,无论并发多大,都不会遇到物理线程数不足的问题。

3.4 并行执行资源管理器相关视图

并行执行资源管理器拥有全局视角,通过视图 GV$OB_PX_TARGET_MONITOR能看到租户内每个 observer 的线程预约状态。关于视图字段详细含义,参考视图手册。

OceanBase(admin@oceanbase)>select  * from GV$OB_PX_TARGET_MONITOR;
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
| SVR_IP       | SVR_PORT | TENANT_ID | IS_LEADER | VERSION         | PEER_IP      | PEER_PORT | PEER_TARGET | PEER_TARGET_USED | LOCAL_TARGET_USED | LOCAL_PARALLEL_SESSION_COUNT |
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
| 192.168.11.2 |    19512 |      1004 | N         | 555393108309134 | 192.168.11.1 |     19510 |          10 |                6 |                 0 |                            0 |
| 192.168.11.2 |    19512 |      1004 | N         | 555393108309134 | 192.168.11.2 |     19512 |          10 |                0 |                 0 |                            0 |
| 192.168.11.1 |    19510 |      1004 | Y         | 555393108309134 | 192.168.11.1 |     19510 |          10 |                6 |                 6 |                            1 |
| 192.168.11.1 |    19510 |      1004 | Y         | 555393108309134 | 192.168.11.2 |     19512 |          10 |                0 |                 0 |                            1 |
+--------------+----------+-----------+-----------+-----------------+--------------+-----------+-------------+------------------+-------------------+------------------------------+
4 rows in set (0.002 sec)

在一个瞬态里,不同 observer 看到的全局状态可能不一致,但后台每 500 毫秒就会同步一次全局状态,总体上各个 observer 看到的状态会基本一致,不会有太大偏差。


4 并行执行分类

OceanBase 支持多种语句的并行,本章按照如下内容分别介绍:

  • 并行查询
  • 并行 DML
  • 并行 DDL
  • 并行 LOAD DATA

4.1 并行查询

你可以在下面几种场景里使用并行查询:

  • select 语句,以及 select 子查询
  • DML 语句(INSERT,UPDATE,DELETE)的查询部分
  • 外表查询

并行查询的决策分为两部分:

  1. 决定走并行查询。如果查询中使用了 PARALLEL HINT,SESSION 上开启了并行查询,或TABLE 属性指定了并行,那么将会开启并行查询。
  2. 决定并行度。并行查询中,每个 DFO 的并行度可以不一样。
  • 对于基表扫描或索引扫描 DFO,其并行度由 PARALLEL HINT、SESSION 并行属性,或TABLE 属性来决定。
  • 对于基表扫描或索引扫描 DFO,如果运行时检测到它访问的数据不足一个宏块,那么它的运行时并发度会被局部自动降低。
  • 对于 JOIN 等中间节点,其 DFO 并行度继承左孩子 DFO 的并行度。
  • 部分 DFO 不允许并行执行(如计算 ROWNUM 的节点),那么他们的并行度会被强制设为 1。

4.2 并行 DML

大部分场景下,可以使用并行 DML (Parallel DML,简称 PDML)加速数据导入、更新、删除操作。

4.2.1 DML 并行度

DML 的并行度和查询部分的并行度一致。开启并行 DML 时,查询部分总是自动开启并行。读出的数据会根据待更新表的分区位置做重分布,然后由多个线程并行 DML,每个线程负责若干个分区。

并行度和目标表的分区数之间有倍数关系时,一般可以达到最佳性能。当并发度高于分区数时,会有多个线程处理同一个分区的数据;当并发度低于分区数时,单个线程可能处理多个分区的数据,并且每个线程处理的分区不重合。当并行度大于目标表分区数时,建议并行度是分区数的整数倍。

一般来说,同时向一个分区插入数据的线程数不要超过 4 个,超过这个值后扩展性并不好,日志同步会成为瓶颈,另外还有一些分区级别的锁同步开销。当并行度小于目标表分区数时,建议分区数是并行度的整数倍。这样,每个线程处理的分区数差不多,可以避免插入工作量倾斜。

4.2.2 索引表处理策略

并行 DML 支持自动维护索引表。

当索引表为本地索引时,并行 DML 在更新主表时,存储层会自动维护本地索引。

当索引表为全局索引时,并行 DML 框架会生成特定的计划来维护全局索引。假设有两个全局索引,处理流程如下:

  1. 首先,会用 DFO1 更新主表;
  2. 然后,DFO1 将全局索引1、全局索引2需要的数据发给 DFO2,用 DFO2 更新全局索引表1;
  3. 最后,DFO2 将全局索2引需要的数据发给 DFO3,用 DFO3 更新全局索引表2。

以上策略对所有 INSERT、DELETE、UPDATE 语句有效,对于 MERGE 语句,处理方式略有不同,所有的索引维护操作会集中到一个 DFO 中处理,如下图所示:

  1. 首先,会用 DFO1 更新主表;
  2. 然后,DFO1 将全局索引1、全局索引2需要的数据发给 DFO2,DFO2 内部会逐个完成全部全局索引的维护操作。

4.2.3 更新分区键的处理策略

对于 UPDATE 语句来说,当主表或全局索引表的分区键被更新时,需要把旧数据从旧分区中删除,然后把新数据插入到新分区中。这个过程通常被称作 row movement。

当发生 row movement 时,需要将 UPDATE 操作拆分成两步操作:先 DELETE,然后 INSERT。具体地,会将出现 row movement 的 UPDATE DFO 拆分成两个 DFO,第一个 DFO 负责 DELETE,第二个 DFO 负责 INSERT。并且,为了避免主键冲突,必须确保 DELETE DFO 完全执行完成后才能开始执行 INSERT DFO。

4.2.4 事务处理

OceanBase 并行 DML 和普通 DML 语句一样,完全支持事务处理。并行 DML 语句可以和其它查询语句一起出现在同一个事务中,执行完成并行 DML 语句后无需立即提交事务,就可以在后继的查询语句中读取到 DML 语句的结果。

在 OBServer v4.1 版本之前,当并行 DML 执行时间超长时,需要给租户配置项 undo retention 设置合适的值,否则可能发生 -4138 (OB_SNAPSHOT_DISCARDED) 错误,导致 SQL 在内部反复重试,直至超时。undo retention 字面意思是 Undo 的保留位点,即从当前时间回溯多长时间的 Undo 日志是保留下来的。对于 OceanBase 数据库来说,是将该时段的所有数据多版本保留下来。当并行 DML 执行时间超过 undo retention 设定的时间时,多版本数据可能被淘汰,当 DML 中的任何后继操作试图访问淘汰的多版本数据时,就会触发 OB_SNAPSHOT_DISCARDED 报错。undo retention 的默认值是 30 分钟,这意味着在默认情况下,如果并行 DML 语句 30 分钟内不能执行完成,无论语句的超时时间设定为多少,语句都可能执行超时并报错。一般来说,如果业务中的最长并行 DML 的执行时间为 2h 时, undo retention 可以设置为 2.5h。undo retention 不能随意设置为极大值,那会导致多版本数据无法回收,打爆磁盘。

从 OBServer v4.1 版本起,并行 DML 的执行不再依赖 undo retention 设定。多版本数据会根据事务版本号回收,只要事务还活跃,事务对应的版本号能读到的内容就不会回收。不过,数据盘满的场景是例外,此时还是会强行回收多版本数据,并行 DML 会收到 OB_SNAPSHOT_DISCARDED 报错,并自动重试整个 SQL。

4.2.5 旁路导入

当内存空间不足时,并行 DML 容易报告内存不足错误。没有走旁路导入路径的并行 DML,数据首先会写入 Memtable,然后通过转储、合并写入磁盘。因为并行 DML 写入数据到 Memtable 的速度极快,当写入速度快过转储速度时,内存就会不断增长,最终触发内存不足报错。

为了解决这个问题,OceanBase v4.1 的存储层提供了旁路导入功能。当并行 DML 使用旁路导入功能执行 INSERT 语句时,数据会绕过 Memtable 直接写入磁盘,不仅避免了内存不足的问题,而且还能提升数据导入性能。

用户通过 APPEND HINT 开启旁路导入功能。旁路导入开始前,要求提交上一个事务,并且设置 autocommit = 1。在 v4.2 版本中,旁路导入功能必须配合并行 DML 才能正常工作,如果没有通过 HINT 或 session 开启并行 DML,则旁路导入的 HINT 会被自动忽略。语法示例如下:

set autocommit = 1;
insert /*+ append enable_parallel_dml parallel(3) */ into t1 select * from t2;

预计在未来版本中,旁路导入功能会放松对事务的要求,可以出现在事务中的任意位置。

4.2.6 无法并行的 DML 操作

为了保证正确的 DML 语义,如下场景中,查询部分可以并行,但 DML 部分无法并行:

  • 如果目标表包含 local unique index,则 DML 部分不可并行,查询部分依然可以并行
  • INSERT ON DUPLICATE KEY UPDATE 语句的 DML 部分不可并行
  • 如果目标表包含 Trigger、外键,则 DML 部分不可并行
  • 如果 MERGE INTO 语句的目标表中包含 global unique index,则 DML 部分不可并行
  • 如果 DML 启用了 IGNORE 模式,则 DML 部分不可并行

如果发现 DML 没有走并行,可以通过 explain extended 来查看 Note 字段,确定未走并行的原因。

4.2.7 Row Movement 操作

当更新分区表的分区键时,可能使数据从一个分区搬迁到另一个分区。在 Oracle 模式下通过下面的命令可以禁止数据跨分区搬迁:

create table t1 (c1 int primary key, c2 int) partition by hash(c1) partitions 3;
alter table t1 disable row movement;

OceanBase(TEST@TEST)>update t1 set c1 = c1 + 100000000;
ORA-14402: updating partition key column would cause a partition change

但是,并行 DML 会忽略表的 row movement 属性,总是允许更新分区键。

4.3 并行 DDL

支持并行执行的 DDL 语句包括:

  • CREATE TABLE AS SELECT
  • ALTER TABLE
  • CREATE INDEX

4.3.0 原理

所有并行 DDL 都是通过特定的 Parallel DML 完成。例如,创建索引本质是创建一个索引空表,然后并行地从主表查询出索引列数据,最后并行地插入到索引表中。

4.3.1 通过 HINT 指定并行度

目前(v4.2)仅 CREATE INDEX 语句支持通过 PARALLEL HINT 开启并行,其余 DDL 都只支持 SESSION 变量和 TABLE PARALLEL 属性来开启。

CREATE /*+ PARALLEL(3) */ INDEX IDX ON T1(C2);

4.3.2 通过 SESSION 变量指定并行度

上述所有 DDL 语句都支持通过 SESSION 变量来指定并行度。指定并行度后,该 SESSION 上的所有 DDL 都自动按照该并行度并行执行,并且查询部分和修改部分都使用相同的并行度。

SET _FORCE_PARALLEL_DDL_DOP = 3;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int);
CREATE INDEX IDX ON T1(C2);
-- v4.2 中,CREATE TABLE AS SELECT 开并行,
-- 用的是 “SET _FORCE_PARALLEL_DML_DOP”
-- 而不是 “SET _FORCE_PARALLEL_DDL_DOP”
-- 后继版本可能会修改成后者
SET _FORCE_PARALLEL_DML_DOP = 3;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int);
CREATE TABLE T2 AS SELECT * FROM T1;

4.3.3 通过表 PARALLEL 属性指定并行度

本节内容实测并没有走并行,需要跟进是否符合设计。验证方法:

select plan_operation, count(*) threads from oceanbase.gv$sql_plan_monitor where trace_id = last_trace_id() group by plan_line_id, plan_operation order by plan_line_id;

DDL 相关表上有 PARALLEL 属性时,可以使用 SET 语句设定 SESSION 变量来开启并行执行。例如:

SET _ENABLE_PARALLEL_DDL = 1;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int) PARALLEL = 3;
CREATE INDEX IDX ON T1(C2) PARALLEL = 2;
-- v4.2 中,CREATE TABLE AS SELECT 开并行,
-- 用的是 “SET _ENABLE_PARALLEL_DML”
-- 而不是 “SET _ENABLE_PARALLEL_DDL”
-- 后继版本可能会修改成后者
SET _ENABLE_PARALLEL_DML = 1;
CREATE TABLE T1 (C1 int, C2 int, C3 int, C4 int) PARALLEL = 3;
CREATE TABLE T2 PARALLEL 2 AS SELECT * FROM T1;

4.3.4 优先级

如果同时指定了 PARALLEL HINT、FORCE SESSION PARALLEL、表级 PARALLEL 属性中的两个或多个,那么它们的优先级如下:

PARALLEL HINT 优先级 > FORCE SESSION PARALLEL 优先级 > 表 PARALLEL 属性优先级

4.3.5 旁路导入

CREATE INDEX 语句无论是否开启并行,总是会走旁路导入(Direct Write,Bypass memtable)。

CREATE TABLE AS SELECT 语句目前(v4.2)还不支持旁路导入功能,如果数据量比较大,建议先建立空表,然后使用并行 DML 旁路导入模式并行插入。

4.4 并行 LOAD DATA

LOAD DATA 的实现不是基于并行 DML,它的实现方式是:先用多个线程并行切分 csv 文件,拼成多个 insert 语句,然后用一定的并发度分发执行这些 insert 语句。

LOAD DATA /*+ parallel(2) */ infile "test.csv" INTO TABLE t1 FIELDS TERMINATED BY ',' ENCLOSED BY '"';

上述语句中,PARALLEL 选项指定加载数据的并行度,如果没有指定 PARALLEL HINT,则默认以 PARALLEL 为 4 来并行执行 LOAD DATA。 PARALLEL 建议取值范围是 [0, 租户的最大CPU数]。

4.5 局部无法并行的场景

  • 最顶层 DFO 无需并行,它负责和客户端交互,以及执行最顶层无需并行的部分操作,如 LIMIT、PX COORDINATOR 等
  • 包含 TABLE UDF 时,含该 UDF 的 DFO 只能串行执行,其余部分依然可以并行


5 并行执行控制参数

OceanBase 提供了一组参数来控制并行执行的初始化和调优。在 OceanBase 启动时,可以根据租户的 CPU 数量和租户配置项 px_workers_per_cpu_quota计算出默认并行执行控制参数。用户也可以不使用默认值,在启动时手工指定参数值,还可以根据实际场景在后期手工增加或减小参数值。

并行执行功能默认是开启的。本文从以下几个方面说明并行执行参数控制技巧:

  • 并行执行默认参数
  • 并行执行相关参数调优

5.1 并行执行默认参数

并行执行的参数能够控制并行线程数、并行执行排队等行为。具体总结为下表:

参数名称默认值级别说明
px_workers_per_cpu_quota10租户级配置项每个 CPU 上可以分配的并行执行线程数,取值范围 [1,20]
parallel_servers_targetMIN CPU * px_workers_per_cpu_quota租户级变量表示租户在每个节点上可申请的并行执行线程数量。
parallel_degree_policyMANUAL租户级/Session级变量自动 DOP 开启开关,当设置为 AUTO 时,会开启自动 DOP,优化器根据统计信息自动计算 Query 的并行度。当设置为 MANUAL 时,根据 HINT、TABLE PARALLEL 属性、SESSION 级 DOP 等控制并行度。
_parallel_max_active_sessions0租户级配置项TPCH 测试中,Power Run 需要的并行度比较高, Throughput Run 需要的并行度比较低。但是,TPCH 测试规范不允许通过 SQL 来动态更改并行度。为了达到动态更改并行度的目的,增加一个配置项 _parallel_max_active_sessions (pmas)当 pmas 等于 0 时,并行执行的活跃 session 数不受限制;当 pmas 大于 0 时,当前并行执行最大活跃 session 数会限制在 pmas 值以下,其它并行执行 session 线程会被挂起,等待被唤醒。某个 query 执行成功后,会唤醒它们尝试继续执行。

OceanBase 为了降低用户使用并行执行的门槛,将并行执行参数个数降到了最低,使用默认参数即可实现开箱即用。在特殊场景下,用户可以通过改变默认参数实现应用调优。

px_workers_per_cpu_quota

这个参数表示在每个 CPU 上可以分配的并行执行线程数。当分配给租户的 MIN CPU 为 N 时,如果并行负载均匀,那么每个节点上可以分配的线程数为 N * px_workers_per_cpu_quota。如果并行负载访问的数据分布不均匀,那么某些节点上实际分配的线程数会短时间大于 N * px_workers_per_cpu_quota,当负载结束后,这些多分配出来的线程会被自动回收。

px_workers_per_cpu_quota 对 parallel_servers_target 默认值的影响仅仅发生在创建租户时,租户创建完成后,修改 px_workers_per_cpu_quota 值并不会改变 parallel_servers_target 值。

一般情况下,不需要更改 px_workers_per_cpu_quota 的默认值。当未开启资源隔离特性时,如果发生并行执行占用了全部 CPU 资源的情况,可以尝试通过降低 px_workers_per_cpu_quota 参数的值来缓解该问题。

parallel_servers_target

这个参数表示租户在每个节点上可申请的并行执行线程资源数量。当资源耗尽时,并行执行请求需要排队。关于排队的概念,请参考本文中《3 并发控制与排队》一节。

并行执行的 CPU 使用率非常低,可能是因为 parallel_servers_target 值太小,导致 SQL 的 DOP 被降级,实际分配的线程数小于期望的数量。OBServer v3.2.3 版本之前,parallel_servers_target 的默认值非常小,可以通过调大 parallel_servers_target 值来解决,建议设置为 MIN CPU * 10。 OBServer v3.2.3 版本起,parallel_servers_target 的默认值就是 MIN CPU * 10,一般不会遇到这个问题。

MIN CPU 表示租户的 min_cpu 值,一般在创建租户时指定。

设定好 parallel_servers_target 值后,重新连接,执行下面的命令查看最新值:

show variables like 'parallel_servers_target';

从运维的角度,如何给 parallel_servers_target 设置一个最大值,避免以后频繁调整呢?理论上,可以将 parallel_servers_target 设置得无穷大,这时会面临一个问题:低效。所有的查询都不排队,它们都开始执行,并争抢 CPU 时间片、争抢磁盘 IO、网络 IO。

从吞吐量的角度看,这个问题不算严重。从单个 SQL 延迟的角度看,这种争抢会严重影响延迟。综合考虑 CPU 和 IO 的利用率,我们可以以租户 UNIT 的 MIN CPU 为基准,一般把 parallel_servers_target 设置为 MIN CPU * 10 即可;少数 IO 密集型场景,CPU 可能用不满,可以将 parallel_servers_target 设置为 MIN CPU * 20。

parallel_degree_policy

这个参数是自动 DOP 开启开关,当设置为 AUTO 时,会开启自动 DOP,优化器根据统计信息自动计算 Query 的并行度。当设置为 MANUAL 时,根据 HINT、TABLE PARALLEL 属性、SESSION 级 DOP 等控制并行度。

OBServer v4.2 起,如果用户不熟悉并行度的设置规则,可以设置 parallel_degree_policy 为 AUTO,让优化器帮忙自动选择并行度。关于自动并行度的计算规则,参考本文的 《2 设定并行度》一节。OBServer v4.2 之前的版本,不支持 parallel_degree_policy 开关,不支持自动 DOP 功能,需要手工设置。

5.2 并行执行相关参数调优

ob_sql_work_area_percentage

租户级变量,用于限制 SQL 模块可以使用的最大内存。它是一个百分值,表示占租户总内存的比值,默认为 5,表示占租户内存的 5%。当 SQL 使用的内存超过限制时,会触发内存落盘。sql work area 内存的实际使用量,可以在 observer.log 日志中搜索 WORK_AREA。例如:

[MEMORY] tenant_id=1001 ctx_id=WORK_AREA hold=2,097,152 used=0 limit=157,286,400

读多写少场景下,如果 SQL 模块内存受限导致数据落盘,可以适当增大 ob_sql_work_area_percentage 值。

workarea_size_policy

OceanBase 实现了全局自适应的内存管理,当 workarea_size_policy 设置为 AUTO 时,执行框架会以最优的策略为各个算子(如 HashJoin、GroupBy、Sort 等)分配内存,开启自适应的数据落盘策略。如果 workarea_size_policy 设置为 MANUAL,则用户需要手工设置 _hash_area_size_sort_area_size

_hash_area_size

租户级配置项,用于手动指定每个算子的 Hash 算法最大可用内存,默认值为 128MB。超过指定值后,会启用内存落盘功能。对于 Hash 算法相关算子有效,如 Hash Join、Hash Groupby、Hash Distinct 等。一般情况下,不需要修改本配置项,建议 workarea_size_policy 保持为 AUTO。如果认定 Hash 算法中系统自动内存落盘功能不满足业务需要,可以先将 workarea_size_policy 设置为 MANUAL,然后手工指定 _hash_area_size值。

_sort_area_size

租户级配置项,用于手动指定每个算子的 Sort 算法最大可用内存,默认值为 128MB。超过指定值后,会启用内存落盘功能。主要在 Sort 算子中使用。一般情况下,不需要修改本配置项,建议 workarea_size_policy 保持为 AUTO。如果认定 Sort 算法中系统自动内存落盘功能不满足业务需要,可以先将 workarea_size_policy 设置为 MANUAL,然后手工指定 _sort_area_size值。

_px_shared_hash_join

SESSION 级系统变量,用于决定Hash Join 是否采用共享哈希表方式进行优化。默认值为 true,表示默认开启 shared hash join 算法。Hash Join 在并行场景下,每个并行线程都是独立计算 hash 表。考虑到左表使用 Broadcast 重分布方式时,因为所有并行工作线程计算出的哈希表是相同的,所以每台机器只需要构造一个 hash 表,由所有工作线程共享,这样可以提高 CPU Cache 友好性。一般情况下,不需要修改本配置项。

5.3 并行 DML 相关的参数调优

OBServer v4.1 起,如果没有事务要求,向表中导入数据时建议使用 INSERT INTO SELECT 加上旁路导入功能,一次性将数据插入到新表。这种方式既可以缩短导入时间,也可以避免写入太快导致内存不足的问题。


6 并行执行诊断

诊断并行执行问题,可以从两个大的方面入手。首先从系统整体上判断,比如确认网络、磁盘 IO、CPU 是不是被打满;然后从具体 SQL 着手,找到问题 SQL 在哪里,它的内部状态如何。

6.1 系统诊断

在业务比较繁忙的系统重出现性能问题时,首先需要在系统层面做初步诊断。一般有两种途径:

  • OCP(OceanBase Cloud Platform),支持可视化观测系统性能
  • tsar 等命令行系统工具,支持查询网络、磁盘、CPU等的历史监控数据

tsar 是一个系统监控和性能分析工具,它可以提供关于 CPU、磁盘、网络等方面的详细信息。以下是 tsar 命令的几个常见用法:

tsar --cpu
tsar --io
tsar --traffic

除了以上示例外,tsar 还支持其他选项和参数,例如通过参数-d 2 可以查出两天前到现在的数据,-i 1 表示以每次1分钟作为采集显示。

tsar -n 2 -i 1 --cpu

如果出现磁盘或网络打爆,则优先从硬件容量过小或并发压力过大角度解决问题。

6.2 SQL 诊断

当遇到并行执行问题时,可以从 SQL 层面、并行执行线程层面、算子层面逐层检查。

6.2.1 确认 SQL 还在执行

确认 SQL 在正常执行:关注 TIME 字段,每次查询 GV$OB_PROCESSLIST 视图 TIME 字段都在增长,并且 STATE 为 ACTIVE,说明 Query 还在执行。

确认 SQL 是否在反复重试:如果 SQL 是因为反复重试导致没有返回结果,RETRY_CNT、RETRY_INFO 字段会有相关信息。其中 RETRY_CNT 是表示重试了多少次了,RETRY_INFO 是最后一次重试的原因。没有重试发生的时候,RETRY_CNT 为 0。TOTAL_TIME 字段表示包含每次重试在内的累计执行时间。如果发现 SQL 在反复重试,则根据 RETRY_INFO 中给出的错误码判断是否需要干预。OBServer v4.1 之前,最常见的一个错误是 -4138(OB_SNAPSHOT_DISCARDED),遇到这种情况,按照本文中 《4 并行执行分类》中的 4.2.4 节指示,调大 undo_retention 值即可解决。对于其它错误,如 -4038(OB_NOT_MASTER)等,无需任何处理,一般可以自动重试成功。如果重试次数总是大于1,并且确认系统整体状态平稳,可以联系 OceanBase 研发做进一步判断。

-- MySQL 模式
SELECT
  TENANT,INFO,TRACE_ID,STATE,TIME,TOTAL_TIME,RETRY_CNT,RETRY_INFO
FROM
  oceanbase.GV$OB_PROCESSLIST;
  

如果发现 GV$OB_PROCESSLIST 里还有对应的 SQL,但状态被标记为 SESSION_KILLED,并且一直没有退出,那么需要联系 OceanBase 研发,报告 bug。这可能是因为:

  • 有逻辑没有正确检测 SESSION KILLED 状态,未能及时退出执行流程

6.2.2 确认 SQL 还在执行并行查询

OBServer 集群中,所有活跃的并行执行线程都可以通过 GV$OB_PX_WORKER_STAT视图查看到。

-- MySQL 模式
OceanBase(admin@oceanbase)>select * from oceanbase.GV$OB_PX_WORKER_STAT;
SESSION_ID: 3221520411
 TENANT_ID: 1002
    SVR_IP: 192.168.0.1
  SVR_PORT: 19510
  TRACE_ID: Y4C360B9E1F4D-0005F9A76E9E66B2-0-0
     QC_ID: 1
    SQC_ID: 0
 WORKER_ID: 0
    DFO_ID: 0
START_TIME: 2023-04-23 17:29:17.372461

-- Oracle 模式
OceanBase(root@SYS)>select * from SYS.GV$OB_PX_WORKER_STAT;
SESSION_ID: 3221520410
 TENANT_ID: 1004
    SVR_IP: 192.168.0.1
  SVR_PORT: 19510
  TRACE_ID: Y4C360B9E1F4D-0005F9A76E9E66B1-0-0
     QC_ID: 1
    SQC_ID: 0
 WORKER_ID: 0
    DFO_ID: 0
START_TIME: 2023-04-23 17:29:15.372461

结合从 GV$OB_PROCESSLIST 拿到的 TRACE_ID,通过这个视图可以看到 SQL 当前正在执行哪些 DFO,执行了多久等信息。

如果这个视图里什么也查不到,但 GV$OB_PROCESSLIST 里依然可以看到相应 SQL,可能的原因包括:

  • 所有 DFO 均已执行完成,结果集较大,当前正处在向客户端吐数据阶段
  • 除了最顶层 DFO 外,其余所有 DFO 均已执行完成

6.2.3 确认每个算子的执行状况

通过 oceanbase.GV$SQL_PLAN_MONITOR (MySQL) 或 SYS.GV$SQL_PLAN_MONITOR(Oracle)可以查看每个并行工作线程中每个算子的执行状态。从 OBServer v4.1 起,GV$SQL_PLAN_MONITOR包含两部分数据:

  • 已经执行完成的算子。所谓执行完成是指这个算子已经调用过 close 接口,在当前线程中不再处理任何数据。
  • 正在执行的算子。所谓正在执行是指这个算子还没有调用 close 接口,正在处理数据过程中。读取这部分算子的数据,需要在查询 GV$SQL_PLAN_MONITOR 视图的 where 条件中指定 request_id < 0。在使用 request_id < 0 条件查询本视图时,我们也称为访问 “Realtime SQL PLAN MONITOR”。本访问接口未来可能会变化。

OBServer 4.1 之前,仅支持查看已经执行完成的算子状态。

GV$SQL_PLAN_MONITOR中有几个重要的域:

  • TRACE_ID:它唯一标识了一条 SQL
  • PLAN_LINE_ID:算子在执行计划中的编号,对应于通过 explain 语句查看到的编号
  • PLAN_OPERATION:算子名称,如 TABLE SCAN、HASH JOIN
  • OUTPUT_ROWS:当前算子已经输出的行数
  • FIRST_CHANGE_TIME:算子吐出首行数据时间
  • LAST_CHANGE_TIME:算子吐出最后一行数据时间
  • FIRST_REFRESH_TIME:算子开始监控时间
  • LAST_REFRESH_TIME:算子结束监控时间

根据上面几个域,基本就能刻画出一个算子处理数据的主要动作了。举例几个场景:

  1. 查看一个已经执行完成的 SQL,每个算子使用了多少个线程来执行:
SELECT PLAN_LINE_ID, PLAN_OPERATION, COUNT(*) THREADS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'YA1E824573385-00053C8A6AB28111-0-0'
GROUP BY PLAN_LINE_ID, PLAN_OPERATION
ORDER BY PLAN_LINE_ID;

+--------------+------------------------+---------+
| PLAN_LINE_ID | PLAN_OPERATION         | THREADS |
+--------------+------------------------+---------+
|            0 | PHY_PX_FIFO_COORD      |       1 |
|            1 | PHY_PX_REDUCE_TRANSMIT |       2 |
|            2 | PHY_GRANULE_ITERATOR   |       2 |
|            3 | PHY_TABLE_SCAN         |       2 |
+--------------+------------------------+---------+
4 rows in set (0.104 sec)
  1. 查看正在执行的 SQL,当前正在执行哪些算子,使用了多少线程,已经吐出了多少行:
SELECT PLAN_LINE_ID, CONCAT(LPAD('', PLAN_DEPTH, ' '), PLAN_OPERATION) OPERATOR, COUNT(*) THREADS, SUM(OUTPUT_ROWS) ROWS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'YA1E824573385-00053C8A6AB28111-0-0' AND REQUEST_ID < 0
GROUP BY PLAN_LINE_ID, PLAN_OPERATION, PLAN_DEPTH
ORDER BY PLAN_LINE_ID;
  1. 查看一个已经执行完成的 SQL,每个算子处理了多少行数据,吐出了多少行数据:
SELECT PLAN_LINE_ID, CONCAT(LPAD('', PLAN_DEPTH, ' '), PLAN_OPERATION) OPERATOR, SUM(OUTPUT_ROWS) ROWS
FROM GV$SQL_PLAN_MONITOR
WHERE TRACE_ID = 'Y4C360B9E1F4D-0005F9A76E9E6193-0-0'
GROUP BY PLAN_LINE_ID, PLAN_OPERATION, PLAN_DEPTH
ORDER BY PLAN_LINE_ID;
+--------------+-----------------------------------+------+
| PLAN_LINE_ID | OPERATOR                          | ROWS |
+--------------+-----------------------------------+------+
|            0 | PHY_PX_MERGE_SORT_COORD           |    2 |
|            1 |  PHY_PX_REDUCE_TRANSMIT           |    2 |
|            2 |   PHY_SORT                        |    2 |
|            3 |    PHY_HASH_GROUP_BY              |    2 |
|            4 |     PHY_PX_FIFO_RECEIVE           |    2 |
|            5 |      PHY_PX_DIST_TRANSMIT         |    2 |
|            6 |       PHY_HASH_GROUP_BY           |    2 |
|            7 |        PHY_HASH_JOIN              | 2002 |
|            8 |         PHY_HASH_JOIN             | 2002 |
|            9 |          PHY_JOIN_FILTER          | 8192 |
|           10 |           PHY_PX_FIFO_RECEIVE     | 8192 |
|           11 |            PHY_PX_REPART_TRANSMIT | 8192 |
|           12 |             PHY_GRANULE_ITERATOR  | 8192 |
|           13 |              PHY_TABLE_SCAN       | 8192 |
|           14 |          PHY_GRANULE_ITERATOR     | 8192 |
|           15 |           PHY_TABLE_SCAN          | 8192 |
|           16 |         PHY_GRANULE_ITERATOR      | 8192 |
|           17 |          PHY_TABLE_SCAN           | 8192 |
+--------------+-----------------------------------+------+
18 rows in set (0.107 sec)

为了展示美观,上面使用了一个域 PLAN_DEPTH来做缩进处理,PLAN_DEPTH 表示这个算子在算子树中的深度。

注:

  1. 尚未调度的 DFO 的算子信息,不会出现在 GV$SQL_PLAN_MONITOR 中。
  2. 在一个 PL 中如果包含多条 SQL,它们的 TRACE_ID 相同


7 并行执行调优技巧

本章介绍一些基础的 OceanBase 并行执行调优技巧。调优是一个永无止境的话题,本章内容也会与时俱进,不断更新。

7.1 手动收集统计信息

如果优化器中保存的统计信息陈旧,可能导致生成的计划不优。OBServer v3.2 和 OBServer v4.1 分别提供了手动收集统计信息的接口:OceanBase 优化器统计信息 (4.x 版本)

OBServer v4.1 手动收集主表、索引表的语法如下:

-- 收集用户TEST的表T1的全局级别的统计信息,所有列的桶个数设定为auto策略:
call dbms_stats.gather_table_stats('TEST', 'T1', granularity=>'GLOBAL', method_opt=>'FOR ALL COLUMNS SIZE AUTO');
-- 收集用户TEST下表T1的索引IDX的索引统计信息,并行度4(IDX不唯一,需指定表名称)
call dbms_stats.gather_index_stats('TEST', 'IDX', degree=>4, tabname=>'T1');

7.2 修改分区方式使用 Partition Wise Join

PoC 场景中,如果有大表 JOIN,并且在业务允许的前提下,可以让大表使用相同的分区方式,并且将他们绑定到同一个表组上,这样可以实现性能最佳的 partition wise join。使用 partition wise join 时,并行度也要调整得和分区数相适应,这样可以获得最佳性能。

7.3 并行度与分区数适配

一般来说,并行度与分区数符合一定的整比例关系,能得到较好的性能。详细论述参考本文中《1.6 通过均衡负载来优化性能》一节。

7.4 创建索引

创建合适的索引,能减少数据的扫描量,可以提高并行执行性能。在哪些表、哪些列上建索引,没有一个通用的方案,需要基于具体 SQL 具体分析。

7.5 创建复制表

OBServer v4.2 及之后版本,通过创建复制表,能减少数据重分布,可以提高并行执行性能,详见 OceanBase 官方文档中创建表的《创建复制表》章节。基本语法举例如下:

create table dup_t1(c1 int) duplicate_scope = 'cluster';
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论