

无谓等待。一个 MapReduce 任务只能在所有前置依赖任务完成后才能启动。然而由 Unix 管道缀连起来的命令却能够并行运行,只要一个任务开始产生输出,下一个任务就可以开始消费处理。由于机器配置和负载的不同,总会在某些机器上出现一些执行时间过长拖后腿的任务(struggler)。而 MapReduce 的这种等待机制,会让单个任务拖垮整个工作流。 Mapper 冗余。Mapper 职责非常简单,仅是读出前置 Reducer 产生的数据,并为之后 Reducer 的分片和排序做准备。在很多情况下,mapper 的职责其实可以并到前序任务的 Reducer 中:如果可以将 Reducer 的输出按照后继 Reducer 的要求准备好,则可将 Reducer 直接串起来,从而省去中间夹杂的 Mapper 阶段。 数据冗余。在分布式文件系统中存储中间结果,意味着将数据在不同机器上冗余了几份。对于并不需要共享的中间结果来说,这种方式太过奢侈。

为了解决 MapReduce 的这些问题,针对分布式系统中的批处理负载,人们开发了很多新的执行引擎。其中最知名的是 Spark、Tez 和 Flink。这几个处理引擎的设计有诸多不同之处,但有一点是相同的:他们将整个数据流看做一个任务,而非将其拆分成几个相对独立的子任务。
repartition + sort(sort merge join):一种方法是进行 repartition 并按 key 对 record 进行排序,就像 MapReduce 的 shuffle 阶段一样。该功能能够提供像 MapReduce 一样的 sort-merge join 和分区方式。 only repartition(partition hash join):另一种可能是接受多个输入,并且用同样的方式进行分区(partitioning),但是会跳过排序阶段。这对于分区哈希 join 很有用,因为该算子只关心记录的分区,但其顺序并不重要,因为总会过哈希表重新组织。 broadcast(broadcast hash join):对于广播哈希 join,一个算子的输出会被发送到多个待 join 分区算子。
按需 shuffle:对于排序等高代价负载,只有在需要的时候才会执行,而不是总强制发生在 map 和 reduce 之间。
省掉无用 Mapper:由于 map 本身并没有进行 repartition,因此可以将其合并到前一个算子中的 reduceer 阶段。
数据传输优化:由于所有 join 和依赖等数据拓扑是显式声明的,调度器可以事先知道哪些数据在哪里被需要。因此可以尽可能地做局部性优化(locality optimization)。例如,可以尽量将消费某分区数据的任务放到生产该数据的机器上执行,从而通过共享内存而非网络来共享数据。
中间结果只存一份:通常来说,只需要将算子的中间结果,在内存中或者本地硬盘中放一份就够了,而不用写到分布式文件系统中。在 MapReduce 中 Mapper 的输出其实也是用了此优化,只不过 dataflow 引擎将该思想扩展到了所有中间状态的存储中。
算子执行流水化:大部分算子只要有输入了就可以执行,而不用等到前置任务都完成了才能够执行。
进程复用:同一个工作流中,前面算子所使用的 JVM 进程池可以为之后算子所复用,而不用像 MapReduce 一样每个任务都要开一个新的 JVM 进程。
基于 MapReduce 的数据流引擎(如 Pig,Hive 或者 Cascading)
新型的的数据流引擎(如 Tez 或者 Spark)

Spark、Flink 和 Tez 都会避免将中间状态写到 HDFS 中,因此他们采用了完全不同的容错方式:如果某个机器上的中间结果丢了,就回溯工作流的算子依赖(DAG 依赖),找到最近可用的数据按照工作流重新计算(最差的情况会一直找到输入数据,而输入数据通常存在于 HDFS 上)。
为了能够通过重新计算来容错,框架必须跟踪每一部分数据的计算轨迹(DGA 依赖,或者说数据谱系,data lineage)——涉及哪些输入分片、应用了哪些算子。Spark 使用弹性分区数据集(RDD)抽象来追踪数据的祖先;Flink 使用了快照来记录所有算子状态,以从最近的检查点(checkpoint)重启运行出错的算子。
当通过重算来容错时,最重要的是要明确计算过程(即算子)是否为确定性的(deterministic):即,给定同样的输入数据,多次运行同一算子总会产生同样的输出吗?当算子的数据已经发到下游后出错时,该问题变的非常重要。如果算子重新运行时产生的数据和之前不一致,则下游算子很难在新老数据间进行冲突处理。对于非确定性算子的容错方案,通常是将下游算子也都清空状态一并重启。
为了规避这种级联重启,用户需要确保每个算子逻辑是确定的。但需要注意的是,计算过程中有很多情况会引入不确定性:
很多编程语言不保证哈希表遍历顺序的稳定 很多概率和统计算法会显式地依赖随机数 所有使用系统时钟或者外部数据源的算子也是非确定的
这些导致不确定性的原因需要从算子逻辑中移除,以保证能够通过重算进行容错。比如,可以通过使用固定种子的伪随机算法来消除随机数的不确定性。
当然,通过重算数据进行出错恢复并非总是正确选择。在中间数据相比原始输出要小很多、计算过程非常耗 CPU 等情况下,相比重算,将中间数据物化到文件中代价可能会更低。

但有些算子,比如排序,不可避免的需要等待所有输出,才可以开始处理并产生输出。这是因为后来的数据,可能会具有较小的 key,因此需要被放到输出流前面。所有需要排序的算子都需要等待输入数据到齐,但其他大部分算子都是可以流水化执行的。

在图状数据建模一节🔗中我们讨论过使用图模型对数据进行建模、使用图查询语言对图中的点边属性进行查询。但第二章相关讨论主要集中在偏 OLTP 方向——对符合要求的小数据集的查询。
在批处理的上下文中,我们可以重新审视图模型——也就是常说的图计算,在全图做一些离线处理和分析。这种需求通常来自推荐系统(比如购物平台的“你可能喜欢”模块)、排名系统中。比如,最出名的图分析算法—— PageRank,最初就是一个对网页权重的排名算法。该算法的基本原理是根据链接到网页的数量和质量,来评估每个网页的受欢迎程度,以最终决定搜索引擎对搜索结果的展示排名。
大部分图计算的算法都是迭代式的,其基本思路是:
1. 每次遍历一条边
2. 和起点进行 join,以传递、连接某些信息
3. 重复 1、2 直到满足某种条件。比如
1. 遍历完了所有边
2. 某些指标开始收敛
在数据模型和查询语言🔗的例子中,就是沿着 localion_in 的边来找到所有从属于北美大陆的地点列表。
如果我们想用 Hadoop 生态来进行图计算,使用分布式文件系统存储图数据很容易(比如使用文件来顺序的存点和边),但是使用 MapReduce 来处理这些图数据,就很难表达“不断迭代处理,直到某些条件满足时停止”的语义。因为 MapReduce 只着眼于数据的单次处理,而很难表达这种递归或者迭代的语义。这种迭代风格的算法通常包含以下几步:
执行一轮:全局调度器针对算法的一个步骤调度一个批处理任务。 条件检查:在一次迭代执行完成后,调度器会检查某些条件是否满足,来判断算法是否可以停止。(比如是否还有边需要遍历、结果指标是否收敛等等)。 继续执行:如果结束条件不满足,全局调度器就继续步骤 1 ,调度一轮新的批处理任务。

在 MapReduce 中,由于 Reducer 需要将具有同样 Key 的数据聚到一块,因此 Mapper 在处理完数据后,会将结果分别“发送” 给对应的 Reducer。Pregel 也有类似的思想——图中的点(vertex)可以“发消息”给其他点。但与 MapReduce 不同的是,由于有边的存在,点通常会顺着边发送消息。
在图计算的每一轮迭代中,会对每个点调用回调函数,处理该点收到的消息,这点和 MapReduce 中的 Reducer 很像。但 Reducer 不会保存跨 MapReduce 的状态,但在 Pregel 中,每个点是会保存跨轮次状态(历史处理结果)的,因此可以每次增量式得处理信息。边界情况,如果该节点没有收到任何消息,则无需调用回调函数。
如果你把每个点认为是一个 actor 的话,Pregel 在某种程度上很像我们之前提到的 Actor 模型🔗。但与 Actor 模型不同的是,Pregel 中点的状态和消息是持久化且容错的。此外,Pregel 的通信会以固定的形式执行:执行框架总是将消息从上一轮完全投递到下一轮中。而 Actor 模型则没有该保证。

Pregel 中限定只能通过消息传递(而不是通过主动拉取)来进行通信,因此可以方便的将消息 batch 起来以减少等待。确切的说,所有的等待只存在于相邻的两次迭代之间,因为消息总是从上一轮次的点发出,在通过网络全部发给发给对应节点后,才会开启下一个轮次的计算。这也是 BSP 模型的特点——计算是一轮一轮的,每轮之间存在着一个同步点。
即使在消息传输的过程中,可能会出现丢失、重复和不定时延迟,Pregel 仍然能够保证所有消息在目的节点上严格的被处理一次。和 MapReduce 一样,Pregel 会进行对上层无感的错误恢复,以期简化所有基于 Pregel 的上层算的实现。

每个顶点运行在哪个机器 每条消息路由到目标顶点
由于模型中的计算只针对单个计算顶点,换句话说,就是每个计算过程都是站在顶点的视角进行“思考”,也即,计算粒度是顶点。这给了框架以任何方式对图结构在不同机器进行划分的自由,理想情况下,将频繁交换信息的节点调度到相同机器上性能最好。但这很难,因为计算是动态的,我们很难事先预知通信的频繁程度,进而依此对顶点进行划分。在实践中,通常使用最简单粗暴的方式,将每个节点随机调度到机器上。
但这样的调度通常会导致大量的跨节点通信开销,甚而,节点间传递的消息规模甚至会比原图数据都大。这种数据传输的额外开销,会非常显著地降低分布式图算法的性能。

完善编程模型
提升处理性能
扩大处理领域
基于某些字段对数据集进行连接的 Join 算子
基于关键字对元组进行聚类的 Group 算子
基于条件对元组进行过滤的 Filter 算子
对元素进行聚合和统计的 Aggregate 算子
这些高层的 API 不仅让用户可以更高效的使用体验,还能够提升任务在物理层面的执行效率。

不同 Join 算法的选择对批处理任务的性能影响极大,但我们最好避免将选择的心智负担推给用户,而可以自动地根据情况进行优化。使用声明式风格的接口使这种自动优化称为可能:用户侧仅需要指定哪些数据集需要 Join,而查询优化器会根据数据特点动态的决定其最优 Join 方式。我们在数据查询语言一节🔗中讨论过这种思想。
但从另一方面来说,MapReduce 和其后继的数据流框架和 SQL 这种完全的声明式语言又不一样。MapReduce 是基于回调函数来构建的:对于任意的一条或一批数据,用户可以自定义处理函数(Mapper 或者 Reducer),调用任何库代码、决定其输出格式。这种方式的优点是,你可以复用很多现成的库来减少开发工作量,比如 Parsing、自然语言分析、图像分析和一些数理统计算法方面的库。
在很长一段时间内,能够自由地跑任意的代码是批处理系统和 MPP 数据库的一个重要区分点。尽管数据库也支持 UDF(user defined function),但使用起来较为复杂,且不能很好的和编程语言的包管理工具(比如 Maven 之于 Java,npm 之于 JavaScript,Rubygems 之于 Ruby)相整合。
然而,在 Join 之外,更进一步地引入声明式功能也对数据流工具有诸多好处。例如,一个过滤函数只有很简单的过滤条件(过滤行)、或只是从原数据集中选择几列(过滤列),则针对每条数据都调用一遍回调函数会有很大的额外性能损耗。如果这些简单的过滤和投影能够用声明式的方式表达,则优化器可以充分利用面向列的存储格式(参见列存🔗),只读取需要的列。Hive,Spark DataFrames 和 Impala 还使用了列式执行引擎(vectorized execution):
“以一种 CPU 缓存友好的方式,紧凑地进行迭代(每次取一个 Cache Line,使用 SIMD 指令进行运算),以减少函数调用次数。”
Spark 使用 JVM 字节码、Impala 使用 LLVM 来通过生成代码的方式优化这些 Join 内层循环。

另外一个越来越重要的方向是数值统计算法,其在推荐和分类的机器学习算法中常常用到。可复用的实现逐渐多了起来:例如 Mahout 在 MapReduce、Spark 和 Flink 之上实现了很多机器学习算法;MADlib 也在 MPP 数据库之上实现了类似的功能模块。
其他有用的算法还有—— k 最近邻算法(k-nearest neighbors)——一种在多维空间中搜索与给定数据条目相似度最高的数据算法,是一种近似性搜索算法。近似搜索对于基因组分析算法也很重要,因为在基因分析中,常需要找不同但类似的基因片段。近年来较火的向量数据库也是主要基于该算法。
批处理引擎被越来越多的用到不同领域算法的分布式执行上。随着批处理系统越来越多支持内置函数和高层声明式算子、MPP 数据库变的越来越可编程和灵活度高,他们开始长的越来越像——说到底,本质上他们都是用于存储和处理数据的系统。

输入数据不可变 一个组件的输出可以喂给另一个组件成为输入 通过组合“解决好一件事的小工具”来解决复杂问题
在 Unix 世界中,让所有命令行具有可组合性的统一抽象是——文件和管道,在 MapReduce 中,这个抽象是分布式文件系统。之后我们注意到,数据流工具通过增加各自的“类管道”的数据传输方式,避免了将中间结果物化到分布式文件系统中的额外损耗,但最外侧的输入和输出仍然是在 HDFS 上。
分布式处理框架最主要解决的两个问题是:
分片 在 MapReduce 中,会根据输入数据的文件块(file chunk)的数量来调度 mappers。mappers 的输出会在二次分片、排序、合并(我们通常称之为 shuffle)到用户指定数量的 Reducer 中。该过程是为了将所有相关的数据(如具有相同 key)集结到一块。 后 MapReduce 时代的数据流工具会尽量避免不必要的排序(因为代价太高了),但他们仍然使用了和 MapReduce 类似的分区方式。 容错 MapReduce 通过频繁的(每次 MapReduce 后)刷盘,从而可以避免重启整个任务,而只重新运行相关子任务就可以从其故障中快速恢复过来。但在错误频率很低的情况下,这种频繁刷盘做法代价很高。数据流工具通过尽可能的减少中间状态的刷盘(当然,shuffle 之后还是要刷的),并将其尽可能的保存在内存中,但这意味着一旦出现故障就要从头重算。算子的确定性可以减少重算的数据范围(确定性能保证只需要算失败分区,并且结果和其他分区仍然一致)。
接下来我们讨论了几种基于 MapReduce 的 Join 算法,这些算法也常被用在各种数据流工具和 MPP 数据库里。他们很好的说明了基于数据分区的算法的工作原理:
Sort-merge joins 分桶排序。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 Reducer 函数会将 join 结果进行输出。 Broadcast hash joins 小表广播。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 Mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。 Partitioned hash joins 分桶哈希。如果两个待 join 输入使用相同的方式进行分片(相同的 key、相同的哈希函数和分区数),则广播哈希算法可以在每个分区内单独应用。
分布式批处理引擎使用了受限的编程模型:回调函数需要是无状态的,且除了输出之外没有其他的副作用。在此设定下,框架可以向应用层屏蔽很多分布式系统的实现细节:当遇到宕机或者网络问题时,子任务可以安全的进行重试;失败任务的输出可以自由抛弃;如果有多个冗余计算过程都成功了,则只有其中一个可以作为输出对后面可见。
由于框架的存在,用户侧的批处理代码无需关心容错机制的实现细节:即使在物理上有大量错误重试的情况下,框架可以保证在逻辑上最终的输出和没有任何故障发生是一致的。这种可靠性语义保证(reliable semantics)通常远强于我们在在线服务中常见到的、将用户的请求写到数据库中的容错性。
批处理任务的基本特点是——读取输入,进行处理,产生输出的过程中,不会修改原数据。换句话说,输出是输入的衍生数据。其中一个重要特点是,输入数据是有界的(bounded):输入的大小是固定的、事先确定的(比如输入是包含一组日志的数据或者一个快照点的数据)。唯其有界,处理任务才能知道什么时候输入读取结束了、什么时候计算完成了。
但在下一章中,我们将会转到流处理(stream processing)上,其中,输入是无界的(unbounded)——你的任务面对的是不知道何时结束的无限数据流。在这种情况下,任何时刻都有可能有新的数据流入,任务会永不结束。我们之后可以看到,虽然批处理和流处理在某些方面有相似之处,但对于输入的无界假设,会在构建系统时对我们的设计产生诸多影响。

少侠,稍等~
这里有一份需要你的产品使用反馈的问卷,烦请花 2 分钟填写下问卷,帮助 NebulaGraph 改善周边工具使用体验~~ 谢谢你 🥺


一致性和共识:分布式的共识:原子提交和两阶段提交 🔗 一致性和共识:分布式系统离不开的顺序保证:顺序和因果、序列号和全序广播 🔗 一致性和共识:什么是分布式系统中的一致性?🔗 分布式系统中的麻烦事(下):真相由多数派定义?🔗 分布式系统中的麻烦事(中):为什么时钟它不可靠?🔗 分布式系统中的麻烦事(上):系统故障,以及不可靠的网络🔗 事务:分布式系统中的事务之最强隔离级别 🔗 事务:分布式系统中的事务之那些事务隔离级别 🔗 事务:分布式系统中的事务之那些棘手的概念们 🔗 分区:分布式系统中的副本和分片、KV 分片、分片均衡,以及路由请求 🔗 冗余:分布式数据之多副本的主从模型选择 🔗 冗余:分布式数据之多副本的读写流程 🔗 编码与演进:如何设计性能良好的编码?你该知道这几种数据流模型 🔗 编码与演进:性能好的编码如何设计?以常见的编码工具 JSON、CSV 等为例 🔗 存储与查询:数据库底层到底是如何处理查询和存储 🔗 数据模型和查询语言:以 图数据库 为例,讲解如何分析数据模型和考量查询语言 🔗 数据模型和查询语言:以 NoSQL 和 NewSQL 为例,讲解如何分析数据模型和考量查询语言 🔗 开篇:一个优秀的数据系统该有的三个特性:可靠、可扩展、可维护 🔗

对图数据库 NebulaGraph 感兴趣?欢迎前往 GitHub ✨ 查看源码:https://github.com/vesoft-inc/nebula;
想要一起提高文档的可读性么?一起来给『文档 nGQL 示例添加注释』吧~请瞄准那条让人费解的 nGQL 语句,留下你的讲解 (///▽///)






