
前言:前段时间读了一下Facebook在2019年顶会ICDE的一篇论文,忙了一段时间,最近几天终于有机会整理一下,相当于又翻译了一遍,但是也不仅仅是翻译,因为生硬的翻译可能并不能完整的表述出准确的意思,当然,英文阅读水平有限,如有歧义欢迎指正。【关注并回复prestopdf获取论文】
前言:前段时间读了一下Facebook在2019年顶会ICDE的一篇论文,忙了一段时间,最近几天终于有机会整理一下,相当于又翻译了一遍,但是也不仅仅是翻译,因为生硬的翻译可能并不能完整的表述出准确的意思,当然,英文阅读水平有限,如有歧义欢迎指正。【关注并回复prestopdf获取论文】
作者:小舰 中国人民大学计算机硕士
来源:DLab数据实验室(ID:rucdlab)
这篇文章是Facebook在2019年的顶会ICDE上的综述类的文章,本文比较系统的讲解了Presto的特性、运行机制、优化措施等,与大家分享。
一、起源
首先来简述一下Presto的发展起源,Presto其实是由FaceBook开源的一个MPP计算引擎,主要用来以解决 Facebook 海量 Hadoop 数据仓库的低延迟交互分析问题,Facebook版本的Presto更多的是以解决企业内部需求功能为主,也叫PrestoDB,版本号以0.xxx来划分。后来,Presto其中的几个人出来创建了更通用的Presto分支,取名Presto SQL,版本号以xxx来划分,例如345版本,这个开源版本也是更为被大家通用的版本。前一段时间,为了更好的与Facebook的Presto进行区分,Presto SQL将名字改为Trino,除了名字改变了其他都没变。不管是Presto DB还是Presto SQL,它们”本是同根生“,因此它们的大部分的机制原理是一样的。本文参照ICDE的一篇论文[1]来详细介绍一下Presto引擎。
二、特点及场景介绍
1.特点
Presto引擎相较于其他引擎的特点正如文章标题描述的这样,多源、即席。多源就是它可以支持跨不同数据源的联邦查询,即席即实时计算,将要做的查询任务实时拉取到本地进行现场计算,然后返回计算结果。除此之外,对于引擎本身,它有几个值得关注的特点:
(1)多租户:它支持并发执行数百个内存、I/O以及CPU密集型的负载查询,并支持集群规模扩展到上千个节点;
(2)联邦查询:它可以由开发者利用开放接口自定义开发针对不同数据源的连接器(Connector),从而支持跨多种不同数据源的联邦数据查询;
(3)内在特性:为了保证引擎的高效性,Presto还进行了一些优化,例如基于JVM运行,Code- Generation等。
2.场景
Presto的应用场景非常广泛,接下来我们主要介绍几种使用比较广泛的场景进行介绍。
(1)交互式分析:交互式分析主要包括一些数据探查类的操作,可能是一些简单的、低数据量的查询,对于响应时间要求较高。交互式查询也是Presto主打的应用场景,Presto的即席计算特性和内部设计机制就是为了能够更好地支持用户进行交互式分析。可以类比用户基于Hive交互式查询HDFS中的数据,用户可以基于Presto查询各种不同的数据源的数据。
(2)批量ETL:ETL是最典型的一类批量操作,它一般针对大数据量进行的一些列转换、计算等操作,一般耗时较长,对响应时间要求较低。
(3)除此之外, Facebook的A/B Test基础架构是基于Presto构建的,用户希望在数小时内获得测试结果,并希望数据完整和准确;另外,Facebook为外部开发人员和广告客户提供的一些自定义报告工具[2]也是基于Presto构建的。
三、整体架构

图1 Presto架构图
如上图所示,Presto主要是由Client、Coordinator、Worker以及Connector等几部分构成,这几个部分互相配合完成“一条SQL语句从提交到获取查询结果”整个生命周期。
1.SQL语句提交
用户或应用通过Presto的JDBC接口或者CLI来提交SQL查询,提交的SQL最终传递给Coordinator进行下一步处理;
2.词/语法分析:
首先会对接收到的查询语句进行词法分析和语法分析,形成一棵抽象语法树(AST);然后,会通过分析抽象语法树来形成逻辑查询计划,如图2、图3所示。

图2 查询SQL
3.生成逻辑计划:
图2是TPC-H测试基准中的一条SQL语句,表达的是两表连接同时带有分组聚合计算的例子,经过词法语法分析后,得到AST,然后进一步分析得到如下的逻辑计划。

图3 逻辑计划
上图就是一棵逻辑计划树,每个节点代表一个物理或逻辑操作,每个节点的子节点作为该节点的输入。逻辑计划只是一个单纯描述SQL的执行逻辑,但是并不包括具体的执行信息,例如该操作是在单节点上执行还是可以在多节点并行执行,再例如什么时候需要进行数据的shuffle操作等。
4.查询优化:
那接下来需要怎样的处理呢?Coordinator的查询优化器会对逻辑计划进行优化。Coordinator将一系列的优化策略(例如剪枝操作、谓词下推、条件下推等)应用于与逻辑计划的各个子计划,从而将逻辑计划转换成更加适合物理执行的结构,形成更加高效的执行策略。Presto已经提供了基于统计信息的优化器,优化器可以通过API提供一些表和列的统计信息来帮助制定合适的join策略等。下面具体来说说优化器在几个方面所做的工作:
(1)自适应:Presto的Connector可以通过Data Layout API提供数据的物理分布信息(例如数据的位置、分区、排序、分组以及索引等属性),如果一个表有多种不同的数据存储分布方式,Connector也可以将所有的数据布局全部返回,这样Presto优化器就可以根据query的特点来选择最高效的数据分布来读取数据并进行处理。举个例子,在OLAP场景中,可能优先会从列存格式的数据源中读取数据。
(2)谓词下推:谓词下推是一个应用非常普遍的优化方式,就是将一些条件或者列尽可能的下推到叶子结点,最终将这些交给数据源去执行,从而可以大大减少计算引擎和数据源之间的I/O,提高效率。

图4 图3的逻辑计划进一步转换后的执行计划(未进行)
(3)节点间并行:不同stage之间的数据shuffle会带来很大的内存和CPU开销,因此,将shuffle数优化到最小是一个非常重要的目标。围绕这个目标,Presto可以借助下面信息:
数据布局信息:上面我们提到的数据物理分布信息同样可以用在这里以减少shuffle数。例如,如果进行join连接的两个表的字段同属于分区字段,则可以将连接操作在在各个节点分别进行,从而可以大大减少数据的shuffle。再比如两个表的连接键加了索引,可以考虑采用嵌套循环的连接策略。
(4)节点内并行:优化器通过在节点内部使用多线程的方式来提高节点内对并行度,而且节点内并行由于具有更小的延迟会比节点间并行效率更高。节点内并行对以下两种场景会有比较大的性能提升。
1)交互式分析:交互式查询的负载大部分是一次执行的短查询,查询负载一般不会经过优化,这就会导致数据倾斜的现象时有发生。典型的表现为少量的节点被分到了大量的数据。
2)批量ETL:这类的查询特点是任务会不加过滤的从叶子结点拉取大量的数据到上层节点进行转换操作,致使上层节点压力非常大。
针对以上两种场景遇到的问题,节点内的多线程机制可以在一定程度上缓解并发瓶颈。这样,引擎就可以通过多线程来运行单个操作符序列(或pipeline),如图5所示的,pipeline1和2通过多线程并行执行来加速build端的hash-join。

图5 pipeline1和2通过多线程并行执行来加速build端的hash-join
当然,除了上述列举的Presto优化器已经实现的优化策略,Presto也正在积极探索Cascades framework(一个基于代价估算的优化器),相信未来优化器会得到进一步的改进。
5.查询调度:
Presto通过Coordinator将stage以task的形式分发到worker节点,coordinator将task以stage为单位进行串联,通过将不同stage按照先后执行顺序串联成一棵执行树,确保数据流能够顺着stage进行流动。
Presto引擎处理一条查询需要进行两套调度,第一套是如何调度stage的执行顺序,第二套是判断每个stage有多少需要调度的task以及每个task应该分发到哪个worker节点上进行处理。
(1)stage调度
Presto支持两种stage调度策略:All-at-once和 Phased两种。All-at- once策略针对所有的stage进行统一调度,不管stage之间的数据流顺序,只要该stage里的task数据准备好了就可以进行处理;Phased策略是需要以stage调度的有向图为依据按序执行,只要前序任务执行完毕开会开始后续任务的调度执行。例如一个hash-join操作,在hash表没有准备好之前,Presto不会调度left side表。
(2)task调度
在进行task调度的时候,调度器会首先区分task所在的stage是哪一类stage:Leaf Stage和intermediate stage。Leaf Stage负责通过Connector从数据源读取数据,intermediate stage负责处理来此其他上游stage的中间结果;
1)leaf stages:在分发leaf stages中的task到worker节点的时候需要考虑网络和connector的限制。例如蚕蛹shared- nothing部署的时候,worker节点和存储是同地协作,这时候调度器就可以根据connector data Layout API来决定将task分发到哪些worker节点。资料表明在一个生产集群大部分的CPU消耗都是花费在了对从connector读取到的数据的解压缩、编码、过滤以及转换等操作上,因此对于此类操作,要尽可能的提高并行度,调动所有的worker节点来并行处理。
2)intermediate stages:这里的task原则上可以被分发到任意的worker节点,但是Presto引擎仍然需要考虑每个stage的task数量,这也会取决于一些相关配置,当然,有时候引擎也可以在运行的时候动态改变task数。
(3)split调度
当Leaf stage中的一个task在一个工作节点开始执行的时候,它会收到一个或多个split分片,不同connector的split分片所包含的信息也不一样,最简单的比如一个分片会包含该分片IP以及该分片相对于整个文件的偏移量。对于Redis这类的键值数据库,一个分片可能包含表信息、键值格式以及要查询的主机列表。Leaf stage中的task必须分配一个或多个split才能够运行,而intermediate stage中的task则不需要。
1)split分配
当task任务分配到各个工作节点后,coordinator就开始给每个task分配split了。Presto引擎要求Connector将小批量的split以懒加载的方式分配给task。这是一个非常好的特点,会有如下几个方面的优点:
a)解耦时间:将前期的split准备工作与实际的查询执行时间分开;
b)减少不必要的数据加载:有时候一个查询可能刚出结果但是还没完全查询完就被取消了,或者会通过一些limit条件限制查询到部分数据就结束了,这样的懒加载方式可以很好的避免过多加载数据;
c)维护split队列:工作节点会为分配到工作进程的split维护一个队列,Coordinator会将新的split分配给具有最短队列的task,Coordinator分给最短的。
d)减少元数据维护:这种方式可以避免在查询的时候将所有元数据都维护在内存中,例如对于Hive Connector来讲,处理Hive查询的时候可能会产生百万级的split,这样就很容易把Coordinator的内存给打满。
当然,这种方式也不是没有缺点,他的缺点是可能会导致难以准确估计和报告查询进度。
6.查询执行
(1)本地数据流动
一旦一个split被分配给线程,它就会被driver loop执行。Presto的driver loog比递归迭代器Volcano模型更复杂,但提供了重要的功能。driver loop处理的是以页(page)为单位的数据单元,当Connector 数据源API收到一个split的时候会返回一个页(page),然后operators会将这个page进行计算等操作,然后输出这个page(图6展示了内存中page的具体结构),driver loop会不断的讲page在operator算子之间“搬运”来进行page的处理。

图6 一个page里的不同块类型
(2)Shuffle
Presto被设计成最小化端到端延迟以及最大化资源利用,它的节点间数据流动机制就反映了这个设计。Presto用http协议来shuffle交换缓冲区内的中间结果。task产生的数据存储缓冲区供其他的节点消费,其他节点通过http协议的长轮询机制来请求中间结果。服务器会保存数据,直到客户端带着上一次请求获得的token请求下一个数据片段。
Presto引擎会通过调整并发度来平衡输入/输出缓冲区的利用率。它会持续监控输出缓冲区,当使用持续保持高位时,会通过减少split的数量来压低并发度,这有利于提高网络资源共享的公平性。
(3)写操作
对于ETL任务,经常需要讲产生的数据写入另外的表中,那么影响向远端存储写性能的一个很重要的因素就是就是并发度,例如通过Connector Data Sink API进行写数据的总的线程数。那自然就会出现一个新的问题:如果写的并发太大,每一个并发都会创建一个新文件,那不可避免会出现小文件过多的问题,另一方面,如果将并发调小,又会影响吞吐量,是的任务运行效率降低。因此,Presto再次采取了一种自适应的方法,通过设置一个缓冲区监控的阀值,一旦某个stage写入缓冲区的数据超过了这个阀值,引擎就会在更多的节点上启动新的任务来动态增加写并发。这种方式对于写任务繁重的批量ETL任务非常有效。
7.资源管理
Presto适用于多租户部署的一个很重要的因素就是它完全整合了细粒度资源管理系统。一个单集群可以并发执行上百条查询以及最大化的利用CPU、IO和内存资源。
(1)CPU调度
Presto首要任务是优化所有集群的吞吐量,例如在处理数据是的CPU总利用量。本地(节点级别)调度又为低成本的计算任务的周转时间优化到更低,以及对于具有相似CPU需求的任务采取CPU公平调度策略。一个task的资源使用是这个线程下所有split的执行时间的累计,为了最小化协调时间,Presto的CPU使用最小单位为task级别并且进行节点本地调度。
Presto通过在每个节点并发调度任务来实现多租户,并且使用合作的多任务模型。任何一个split任务在一个运行线程中只能占中最大1秒钟时长,超时之后就要放弃该线程重新回到队列。如果该任务的缓冲区满了或者OOM了,即使还没有到达占用时间也会被切换至另一个任务,从而最大化CPU资源的利用。
当一个split离开了运行线程,Presto需要去定哪一个task(包含一个或多个split)排在下一位运行。
Presto通过合计每个task任务的总CPU使用时间,从而将他们分到五个不同等级的队列而不是仅仅通过提前预测一个新的查询所需的时间的方式。如果累积的Cpu使用时间越多,那么它的分层会越高。Presto会为每一个曾分配一定的CPU总占用时间。
调度器也会自适应的处理一些情况,如果一个操作占用超时,调度器会记录他实际占用线程的时长,并且会临时减少它接下来的执行次数。这种方式有利于处理多种多样的查询类型。给一些低耗时的任务更高的优先级,这也符合低耗时任务往往期望尽快处理完成,而高耗时的任务对时间敏感性低的实际。
(2)内存管理
在像Presto这样的多租户系统中,内存是主要的资源管理挑战之一。
1)内存池
在Presto中,内存被分成用户内存和系统内存,这两种内存被保存在内存池中。用户内存是指用户可以仅根据系统的基本知识或输入数据进行推理的内存使用情况(例如,聚合的内存使用与其基数成比例)。另一方面,系统内存是实现决策(例如shuffle缓冲区)的副产品,可能与查询和输入数据量无关。换句话说,用户内存是与任务运行有关的,我们可以通过自己的程序推算出来运行时会用到的内存,系统内存可能更多的是一些不可变的。
Presto引擎对单独对用户内存和总的内存(用户+系统)进行不同的规则限制,如果一个查询超过了全局总内存或者单个节点内存限制,这个查询将会被杀掉。当一个节点的内存耗尽时,该查询的预留内存会因为任务停止而被阻塞。
有时候,集群的内存可能会因为数据倾斜等原因造成内存不能充分利用,那么Presto提供了两种机制来缓解这种问题--溢写和保留池。
2)溢写
当某一个节点内存用完的时候,引擎会启动内存回收程序,现将执行的任务序列进行升序排序,然后找到合适的task任务进行内存回收(也就是将状态进行溢写磁盘),知道有足够的内存来提供给任务序列的后一个请求。
3)预留池
如果集群的没有配置溢写策略,那么当一个节点内存用完或者没有可回收的内存的时候,预留内存机制就来解除集群阻塞了。这种策略下,查询内存池被进一步分成了两个池:普通池和预留池。这样当一个查询把普通池的内存资源用完之后,会得到所有节点的预留池内存资源的继续加持,这样这个查询的内存资源使用量就是普通池资源和预留池资源的加和。为了避免死锁,一个集群中同一时间只有一个查询可以使用预留池资源,其他的任务的预留池资源申请会被阻塞。这在某种情况下是优点浪费,集群可以考虑配置一下去杀死这个查询而不是阻塞大部分节点。
8.容错
Presto可以对一些临时的报错采用低级别的重试来恢复,然而,截止2018年还并没有任何有效的内嵌容错机制来解决coordinator或者worker节点坏掉的情况。Presto依靠的是客户端的自动重跑失败查询。
在Facebook的产品应用中,也是根据不同的场景来采取不同的可用性模式。交互式查询和批量ETL会使用主备Coordinator的方式,A/B Test 会采用多活集群的模式。同时辅以外部的监控系统将一些频繁产生错误的节点及时移出集群,修复好的节点重新加入集群。这些方式只能从不同程度上降低不可用风险,但是并不能解决。
标准检查点或者部分修复技术是计算代价比较高的,而且很难在这种一旦结果可用就返回给客户端(即时查询类)的系统中实现。基于复制的容错机制会造成较大的资源消耗,考虑到成本,这些技术的期望值并不明确,特别是考虑到节点平均故障时间、集群大小约1000个节点规模,统计数据显示大多数查询在几个小时内完成(包括批处理ETL)。
当然,Presto也在积极的提高对长查询的容错机制,例如评估添加可选检查点和限制重新启动到执行计划的子树,这些子树可能不会以流水线方式运行。
四、总结
以上是Facebook从一条SQL语句在Presto引擎被处理的整个生命周期的角度进行的总结,通过这篇文章我们可以大致了解Presto的整个运行机制了。英文阅读水平有限,如有歧义欢迎交流~
参考文献:
[1] Raghav Sethi, Martin Traverso, Dain Sundstrom, David Phillips, Wenlei Xie, Yutian Sun, Nezih Yegitbasi, Haozhun Jin, Eric Hwang, Nileema Shingte, Christopher Berner: Presto: SQL on Everything. ICDE 2019: 1802-1813
[2] https://analytics.facebook.com




