暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分布式批处理之 MapReduce 之外

NebulaGraph 2024-01-22
249

之前的两篇文章《MapReduce 和分布式文件系统(上)》🔗 和《MapReduce 和分布式文件系统(下)》🔗 讲述了 MapReduce 和分布式文件系统的关系,在接下面的章节中,我们将讲述 MapReduce 之外的批处理。
尽管 MapReduce 在本世纪10年代最后几年中被炒的非常热,但它其实只是众多分布式系统编程模型中的一种。在面对不同的数据量、数据结构和数据处理类型时,很多其他计算模型可能更为合适。
但作为分布式系统之上的一种抽象,MapReduce 非常干净、简洁,很适合作为入门的学习对象,因此我们本章花了很多篇幅来讨论它。但需要指出,这里的简洁指的是易于理解,而非易于使用。恰恰相反,使用裸的 MapReduce 接口来完成复杂的处理任务时,实现会变得非常复杂。比如,你需要从头实现数据处理中常见的各种 join 算法。
为了降低直接使用裸 MapReduce 接口的复杂度,人们在其上封装了很多高层编程模型(Pig,Hive,Cascading 和 Crunch)。如果你知道 MapReduce 是怎么工作的,这些框架也很容易理解。且。有了这些高层框架,很多基本的批处理任务变得非常容易实现。
然而,MapReduce 执行模型存在很多问题,即使在其上多封装几层也并不能解决,反而会使得性能变得更差。一方面,MapReduce 容错性非常好,你可以使用该模型,在工作线程很不稳定的多租户系统上处理几乎任意尺度的数据(尽管非常慢)。另一方面,针对某些类型的数据处理,有些其他合适工具的速度可能要快上几个数量级。
在本章余下部分,我们会考察一些可替代 MapReduce 的批处理模型。从某种意义上来说,我们下一章要讨论的流处理模型,也可以认为是对针对批处理的一种优化。
中间状态的物化  


PART.01
之前讨论过,每个 MapReduce 任务都是互相独立的。每个任务和外界的唯一交互点就是分布式系统上的文件夹。如果想让某个任务的输出成为另一个任务的输入,你只需将第二个任务的输入文件夹配置为第一个任务的输出文件夹。跨 MapReduce 的调度框架必须保证:只有前序任务完全结束后,后序任务才能开始执行。
如果我们需要将前序任务的输出数据进行大范围发布,那么 MapReduce 的这种结果物化机制(持久化到分布式系统中)是合理的。在这种情况下,其他任务无须关心是哪个任务生产了这些数据,而只需通过名字定位到输出数据所在文件夹即可,这符合我们工程中常用的解耦和复用的理念。
然而,在大多数情况下,我们事先就明确地知道某个任务的输出只会为同一团队的另一个任务所使用。在这种情况下,保存到分布式文件系统上的两个任务间的数据其实只是一种中间状态(intermediate state):只是一种将数据从前序任务传递到后继任务的方式。在诸如推荐系统等复杂的数据流中,通常会包含 50~100 个 MapReduce 任务,其中绝大部分任务间的数据都属于数据流中间状态。
将中间状态写入文件的过程称为物化(materialization)。我们在之前聚合:数据立方和物化视图一节中也提到过相关概念——物化视图(materialized view)🔗。在当时上下文中,物化视图意味着将某些操作的结果写到外存中,而非每次都按需计算。
与 MapReduce 相对,在之前日志分析的例子中,我们使用 Unix 管道而非文件将不同的命令的输入输出进行耦合。Unix 管道并不会将中间结果物化,而是使用一个基于内存的小块缓存(buffer)将一个命令的输出导向另一个命令输入。
相比 Unix 管道,MapReduce 将工作流中间结果进行物化的方式有很多缺点:
  • 无谓等待。一个 MapReduce 任务只能在所有前置依赖任务完成后才能启动。然而由 Unix 管道缀连起来的命令却能够并行运行,只要一个任务开始产生输出,下一个任务就可以开始消费处理。由于机器配置和负载的不同,总会在某些机器上出现一些执行时间过长拖后腿的任务(struggler)。而 MapReduce 的这种等待机制,会让单个任务拖垮整个工作流。
  • Mapper 冗余。Mapper 职责非常简单,仅是读出前置 Reducer 产生的数据,并为之后 Reducer 的分片和排序做准备。在很多情况下,mapper 的职责其实可以并到前序任务的 Reducer 中:如果可以将 Reducer 的输出按照后继 Reducer 的要求准备好,则可将 Reducer 直接串起来,从而省去中间夹杂的 Mapper 阶段。
  • 数据冗余。在分布式文件系统中存储中间结果,意味着将数据在不同机器上冗余了几份。对于并不需要共享的中间结果来说,这种方式太过奢侈。

数据流引擎

为了解决 MapReduce 的这些问题,针对分布式系统中的批处理负载,人们开发了很多新的执行引擎。其中最知名的是 Spark、Tez 和 Flink。这几个处理引擎的设计有诸多不同之处,但有一点是相同的:他们将整个数据流看做一个任务,而非将其拆分成几个相对独立的子任务

由于这些引擎会显式地考虑跨越多个阶段的全局数据流,因此也常被称为数据流引擎(dataflow engines)。和 MapReduce 一样,这些引擎也会对每个数据记录在单个线程中,重复调用用户的定制函数(包裹用户逻辑)。并且会将输入数据集进行切片(partition),并行地执行(数据并行),然后将一个函数的输出通过网络传递给下一个函数作为输入。
和 MapReduce 不同的是,这些函数可以进行更灵活地组织,而不需要严格遵循 map 或者 reduce 格式。我们成这些函数为算子(operators),且 dataflow 引擎会提供多种选择,以将一个算子的数据输出导入到下一个算子(类似数据流接线方式):
  • repartition + sort(sort merge join):一种方法是进行 repartition 并按 key 对 record 进行排序,就像 MapReduce 的 shuffle 阶段一样。该功能能够提供像 MapReduce 一样的 sort-merge join 和分区方式。
  • only repartition(partition hash join):另一种可能是接受多个输入,并且用同样的方式进行分区(partitioning),但是会跳过排序阶段。这对于分区哈希 join 很有用,因为该算子只关心记录的分区,但其顺序并不重要,因为总会过哈希表重新组织。
  • broadcast(broadcast hash join):对于广播哈希 join,一个算子的输出会被发送到多个待 join 分区算子。
这种风格的处理引擎思想来自于 Dryad 和 Nephele 等系统,相比 MapReduce 模型,有如下优点:
  • 按需 shuffle:对于排序等高代价负载,只有在需要的时候才会执行,而不是总强制发生在 map 和 reduce 之间。

  • 省掉无用 Mapper:由于 map 本身并没有进行 repartition,因此可以将其合并到前一个算子中的 reduceer 阶段。

  • 数据传输优化:由于所有 join 和依赖等数据拓扑是显式声明的,调度器可以事先知道哪些数据在哪里被需要。因此可以尽可能地做局部性优化(locality optimization)。例如,可以尽量将消费某分区数据的任务放到生产该数据的机器上执行,从而通过共享内存而非网络来共享数据。

  • 间结果只存一份:通常来说,只需要将算子的中间结果,在内存中或者本地硬盘中放一份就够了,而不用写到分布式文件系统中。在 MapReduce 中 Mapper 的输出其实也是用了此优化,只不过 dataflow 引擎将该思想扩展到了所有中间状态的存储中。

  • 算子执行流水化:大部分算子只要有输入了就可以执行,而不用等到前置任务都完成了才能够执行。

  • 进程复用:同一个工作流中,前面算子所使用的 JVM 进程池可以为之后算子所复用,而不用像 MapReduce 一样每个任务都要开一个新的 JVM 进程。

你可以使用数据流引擎实现和 MapReduce 数据流一样的计算逻辑,并且由于上面的优化,执行速度通常更快。由于算子是 map 和 reduce 的泛化,同样处理逻辑的代码,仅简单调整下配置,便可以无缝的跑在两种数据流引擎上:
  1. 基于 MapReduce 的数据流引擎(如 Pig,Hive 或者 Cascading)

  2. 新型的的数据流引擎(如 Tez 或者 Spark)

Tez 是一个依赖 YARN 的 shuffle 服务在节点间进行数据拷贝的轻量级库;而 Spark 和 Flink 是各有其自身一套完整的网络通信层、调度模块和用户 API 的重量级框架。我们稍后将会讨论这些高层接口(high-level API)。
容错
所有中间状态持久化到分布式文件系统中的一个好处是——持久性(durable),这会使得 MapReduce 的容错方式变得非常简单:如果某个任务挂了,仅需要在其他机器上重新启动,并从文件系统中读取相同的输入即可。

Spark、Flink 和 Tez 都会避免将中间状态写到 HDFS 中,因此他们采用了完全不同的容错方式:如果某个机器上的中间结果丢了,就回溯工作流的算子依赖(DAG 依赖),找到最近可用的数据按照工作流重新计算(最差的情况会一直找到输入数据,而输入数据通常存在于 HDFS 上)。

为了能够通过重新计算来容错,框架必须跟踪每一部分数据的计算轨迹(DGA 依赖,或者说数据谱系,data lineage)——涉及哪些输入分片、应用了哪些算子。Spark 使用弹性分区数据集(RDD)抽象来追踪数据的祖先;Flink 使用了快照来记录所有算子状态,以从最近的检查点(checkpoint)重启运行出错的算子。

当通过重算来容错时,最重要的是要明确计算过程(即算子)是否为确定性的(deterministic):即,给定同样的输入数据,多次运行同一算子总会产生同样的输出吗?当算子的数据已经发到下游后出错时,该问题变的非常重要。如果算子重新运行时产生的数据和之前不一致,则下游算子很难在新老数据间进行冲突处理。对于非确定性算子的容错方案,通常是将下游算子也都清空状态一并重启。

为了规避这种级联重启,用户需要确保每个算子逻辑是确定的。但需要注意的是,计算过程中有很多情况会引入不确定性:

  1. 很多编程语言不保证哈希表遍历顺序的稳定
  2. 很多概率和统计算法会显式地依赖随机数
  3. 所有使用系统时钟或者外部数据源的算子也是非确定的

这些导致不确定性的原因需要从算子逻辑中移除,以保证能够通过重算进行容错。比如,可以通过使用固定种子的伪随机算法来消除随机数的不确定性。

当然,通过重算数据进行出错恢复并非总是正确选择。在中间数据相比原始输出要小很多、计算过程非常耗 CPU 等情况下,相比重算,将中间数据物化到文件中代价可能会更低。

物化的一些讨论
回到 Unix 哲学上,MapReduce 可类比为将每个命令的输出都写入临时文件中,而现代数据流引擎则更像 Unix 管道。Flink 更是直接基于流水线的思想构建的:即,将上游算子的输出增量地送给下游算子,下游算子一经收到数据,便可开始着手处理

但有些算子,比如排序,不可避免的需要等待所有输出,才可以开始处理并产生输出。这是因为后来的数据,可能会具有较小的 key,因此需要被放到输出流前面。所有需要排序的算子都需要等待输入数据到齐,但其他大部分算子都是可以流水化执行的。

当工作流任务完成后,其输出通常要进行持久化,以让用户能够引用并使用——最常见的,就是写回分布式文件系统。因此,当使用数据流引擎时,数据流的输入和最终输出通常都会物化在 HDFS 上。和 MapReduce 一样,数据流任务的输入也是不可变的,输出不会在原地更新,而会写入其他地方。相比 MapReduce,这些数据流引擎的提升就是避免将所有子任务的中间状态也写入分布式文件系统中。
图计算和迭代处理  

PART.02

图状数据建模一节🔗中我们讨论过使用图模型对数据进行建模、使用图查询语言对图中的点边属性进行查询。但第二章相关讨论主要集中在偏 OLTP 方向——对符合要求的小数据集的查询。

在批处理的上下文中,我们可以重新审视图模型——也就是常说的图计算,在全图做一些离线处理和分析。这种需求通常来自推荐系统(比如购物平台的“你可能喜欢”模块)、排名系统中。比如,最出名的图分析算法—— PageRank,最初就是一个对网页权重的排名算法。该算法的基本原理是根据链接到网页的数量和质量,来评估每个网页的受欢迎程度,以最终决定搜索引擎对搜索结果的展示排名。

DAG 和图计算

上一小结提到的 Spark、Flink 和 Tez 等数据流引擎通常以有向无环图(directed acyclic graph,DAG)的形式组织一个计算任务中的算子。但这并不是图计算(graph processing),尽管数据在不同算子间进行流动时,会构成图一样的计算拓扑(SQL 的执行引擎实现也是类似),但这是数据在计算时形成的计算拓扑,而数据集本身的结构仍然是关系型的。但在图计算中,数据本身就具有图结构。比如 PageRank 中的网页间通过链接关系构成的引用图。这又是一个典型的容易引起混淆的、不同领域却具有相似命名的场景。

大部分图计算的算法都是迭代式的,其基本思路是:

1. 每次遍历一条边

2. 和起点进行 join,以传递、连接某些信息

3. 重复 1、2 直到满足某种条件。比如

    1. 遍历完了所有边

    2. 某些指标开始收敛

数据模型和查询语言🔗的例子中,就是沿着 localion_in 的边来找到所有从属于北美大陆的地点列表。

如果我们想用 Hadoop 生态来进行图计算,使用分布式文件系统存储图数据很容易(比如使用文件来顺序的存点和边),但是使用 MapReduce 来处理这些图数据,就很难表达“不断迭代处理,直到某些条件满足时停止”的语义。因为 MapReduce 只着眼于数据的单次处理,而很难表达这种递归或者迭代的语义。这种迭代风格的算法通常包含以下几步:

  1. 执行一轮:全局调度器针对算法的一个步骤调度一个批处理任务。
  2. 条件检查:在一次迭代执行完成后,调度器会检查某些条件是否满足,来判断算法是否可以停止。(比如是否还有边需要遍历、结果指标是否收敛等等)。
  3. 继续执行:如果结束条件不满足,全局调度器就继续步骤 1 ,调度一轮新的批处理任务。
使用 MapReduce 实现上述过程通常非常低效,因为 MapReduce 在设计时并没有专门面向迭代算法:即使一次迭代只需要增量地读图中的很小一部分数据,MapReduce 也总是无脑读全部输入(本质上是因为 MapReduce 无法针对数据文件中的图结构进行增量调度)。
Pregel 处理模型
BSP(bulk synchronous parallel 模型),作为一种专门为图批处理优化的计算模型,近年来变的越来越流行。Apache Giraph,Spark’s GraphX 和 Flink’s Gelly 都在 API 中实现了该计算模型。由于谷歌的一篇名为 Pregel 的论文将该图计算模型大规模的推广,因此 BSP 模型有时也被称为 Pregel 模型

在 MapReduce 中,由于 Reducer 需要将具有同样 Key 的数据聚到一块,因此 Mapper 在处理完数据后,会将结果分别“发送” 给对应的 Reducer。Pregel 也有类似的思想——图中的点(vertex)可以“发消息”给其他点。但与 MapReduce 不同的是,由于有边的存在,点通常会顺着边发送消息。

在图计算的每一轮迭代中,会对每个点调用回调函数,处理该点收到的消息,这点和 MapReduce 中的 Reducer 很像。但 Reducer 不会保存跨 MapReduce 的状态,但在 Pregel 中,每个点是会保存跨轮次状态(历史处理结果)的,因此可以每次增量式得处理信息。边界情况,如果该节点没有收到任何消息,则无需调用回调函数。

如果你把每个点认为是一个 actor 的话,Pregel 在某种程度上很像我们之前提到的 Actor 模型🔗。但与 Actor 模型不同的是,Pregel 中点的状态和消息是持久化且容错的。此外,Pregel 的通信会以固定的形式执行:执行框架总是将消息从上一轮完全投递到下一轮中。而 Actor 模型则没有该保证。

容错

Pregel 中限定只能通过消息传递(而不是通过主动拉取)来进行通信,因此可以方便的将消息 batch 起来以减少等待。确切的说,所有的等待只存在于相邻的两次迭代之间,因为消息总是从上一轮次的点发出,在通过网络全部发给发给对应节点后,才会开启下一个轮次的计算。这也是 BSP 模型的特点——计算是一轮一轮的,每轮之间存在着一个同步点。

即使在消息传输的过程中,可能会出现丢失、重复和不定时延迟,Pregel 仍然能够保证所有消息在目的节点上严格的被处理一次。和 MapReduce 一样,Pregel 会进行对上层无感的错误恢复,以期简化所有基于 Pregel 的上层算的实现。

容错的方式也很简洁——在每个迭代轮次末尾,将所有顶点的状态做 checkpoint,且持久化到外存。如果某个节点故障,内存中的状态丢失,最简单的恢复方式就是回滚该轮次所有计算,恢复到上一个 checkpoint,然后重启该轮次的所有计算。如果计算是确定性的,且消息也被记录了下来,则代价相对的小的方式是只对故障节点所包含部分的数据进行重新计算(就像之前讨论过的数据流工具,比如 Spark 中的 Partition 容错方式一样)。
并行执行
每个节点并不需要感知其所运行的物理机器;当其想要发消息时,只需要知道下游节点的 VertexID 即可(类似于 MapReduce 中使用 key 进行路由)。如何对图结构进行划分是框架的职责:
  1. 每个顶点运行在哪个机器
  2. 每条消息路由到目标顶点

由于模型中的计算只针对单个计算顶点,换句话说,就是每个计算过程都是站在顶点的视角进行“思考”,也即,计算粒度是顶点。这给了框架以任何方式对图结构在不同机器进行划分的自由,理想情况下,将频繁交换信息的节点调度到相同机器上性能最好。但这很难,因为计算是动态的,我们很难事先预知通信的频繁程度,进而依此对顶点进行划分。在实践中,通常使用最简单粗暴的方式,将每个节点随机调度到机器上。

这样的调度通常会导致大量的跨节点通信开销,甚而,节点间传递的消息规模甚至会比原图数据都大。这种数据传输的额外开销,会非常显著地降低分布式图算法的性能。

故此,如果能将待计算的图数据放进单机内存中,那使用单机图算法效率大概率要比分布式图批处理效率高。即使图结构比单机存大一些,但可以放到单机硬盘上,相较分布式系统,像 GraphChi 这样的单机处理引擎也往往是更好的选择。只有单机存储无法容纳所有数据,再来考虑类似 Pregel 的分布式图计算框架。也因此,最大限度地并行化图计算过程、提升性能的算法,仍然是一个然活跃的研究领域。
高层 API 和语言  

PART.03
在 MapReduce 流行这些年之后,针对大数据集的分布式批处理执行引擎已经逐渐成熟。到现在(2017年)已经有比较成熟的基础设施可以在上千台机器上处理 PB 量级的数据。因此,针对这个量级的基本数据处理问题可以认为已经被解决,大家的注意力开始转到其他问题上:
  1. 完善编程模型

  2. 提升处理性能

  3. 扩大处理领域

之前我们讨论过,由于 MapReduce 提供的编程接口实在太过难用,像 Hive, Pig,Cascading 和 Crunch 等处理 API 和框架逐渐流行。Apache Tez 更进一步,可以让原来的代码不做过多改动就可以迁移。Spark 和 Flink 也各自有其高层的数据流 API,基本借鉴自 FlumeJava。
这些数据流工具基本都是用关系型的算子来表达计算过程:
  1. 基于某些字段对数据集进行连接的 Join 算子

  2. 基于关键字对元组进行聚类的 Group 算子

  3. 基于条件对元组进行过滤的 Filter 算子

  4. 对元素进行聚合和统计的 Aggregate 算子

等等。这些算子内部实现时,会用到我们本章之前提到的各种 join 和 group 算法。
除了能够显著降低使用方的代码量外,这些高层的框架通常还支持交互式的使用因此,你可以在 shell 中增量式的构建分析代码,且能够方便的多次跑以查看运行结果。当我们拿到一个新的数据集,需要做实验探索该如何对其进行分析时,这种交互式的方式非常方便。这其实也是我们之前讨论过的 Unix 编程哲学的一个体现。

这些高层的 API 不仅让用户可以更高效的使用体验,还能够提升任务在物理层面的执行效率。

向声明式方向靠拢
相比直接实现代码进行 Join,使用关系型的 Join 算子给了处理框架分析数据集特点、选择最高效 Join 算的优化空间。Hive,Spark 和 Flink 都有基于代价的优化器,可以对执行路径进行优化。甚至,可以交换 Join 的顺序,来最小化中间数据集的物化。

不同 Join 算法的选择对批处理任务的性能影响极大,但我们最好避免将选择的心智负担推给用户,而可以自动地根据情况进行优化。使用声明式风格的接口使这种自动优化称为可能:用户侧仅需要指定哪些数据集需要 Join,而查询优化器会根据数据特点动态的决定其最优 Join 方式。我们在数据查询语言一节🔗中讨论过这种思想。

但从另一方面来说,MapReduce 和其后继的数据流框架和 SQL 这种完全的声明式语言又不一样。MapReduce 是基于回调函数来构建的:对于任意的一条或一批数据,用户可以自定义处理函数(Mapper 或者 Reducer),调用任何库代码、决定其输出格式。这种方式的优点是,你可以复用很多现成的库来减少开发工作量,比如 Parsing、自然语言分析、图像分析和一些数理统计算法方面的库。

在很长一段时间内,能够自由地跑任意的代码是批处理系统和 MPP 数据库的一个重要区分点。尽管数据库也支持 UDF(user defined function),但使用起来较为复杂,且不能很好的和编程语言的包管理工具(比如 Maven 之于 Java,npm 之于 JavaScript,Rubygems 之于 Ruby)相整合。

然而,在 Join 之外,更进一步地引入声明式功能也对数据流工具有诸多好处。例如,一个过滤函数只有很简单的过滤条件(过滤行)、或只是从原数据集中选择几列(过滤列),则针对每条数据都调用一遍回调函数会有很大的额外性能损耗。如果这些简单的过滤和投影能够用声明式的方式表达,则优化器可以充分利用面向列的存储格式(参见列存🔗),只读取需要的列。Hive,Spark DataFrames 和 Impala 还使用了列式执行引擎(vectorized execution):

以一种 CPU 缓存友好的方式,紧凑地进行迭代(每次取一个 Cache Line,使用 SIMD 指令进行运算),以减少函数调用次数。”

Spark 使用 JVM 字节码、Impala 使用 LLVM 来通过生成代码的方式优化这些 Join 内层循环。

通过在高层 API 中注入声明式的特性、在运行时使用优化器动态地优化,批处理框架长得越来越像 MPP 数据库(也获得了类似性能)。但同时,仍然保持原来允许运行任意库代码、读取任意格式数据的扩展性,让这些框架仍然可以保持原有的灵活性。
不同领域的特化
保留运行任意代码的自由度很有必要,但对于很多非常通用、反复出现的处理模式,我们有必要提供系统实现以方便用户复用。传统上,MPP 数据库通常充当商业智能(BI)分析和商业汇报领域的生态位,但这个方向只是批处理众多应用方向的一个。

另外一个越来越重要的方向是数值统计算法,其在推荐和分类的机器学习算法中常常用到。可复用的实现逐渐多了起来:例如 Mahout 在 MapReduce、Spark 和 Flink 之上实现了很多机器学习算法;MADlib 也在 MPP 数据库之上实现了类似的功能模块。

其他有用的算法还有—— k 最近邻算法(k-nearest neighbors)——一种在多维空间中搜索与给定数据条目相似度最高的数据算法,是一种近似性搜索算法。近似搜索对于基因组分析算法也很重要,因为在基因分析中,常需要找不同但类似的基因片段。近年来较火的向量数据库也是主要基于该算法。

批处理引擎被越来越多的用到不同领域算法的分布式执行上。随着批处理系统越来越多支持内置函数和高层声明式算子、MPP 数据库变的越来越可编程和灵活度高,他们开始长的越来越像——说到底,本质上他们都是用于存储和处理数据的系统。

小结

PART.04
在本章,我们探讨了批处理的话题。我们从 Unix 的命令行工具 awk、grep 和 sort 开始,探讨其背后的思想被如何应用到 MapReduce 框架和更近的数据流框架中。这些核心设计原则包括:
  1. 输入数据不可变
  2. 一个组件的输出可以喂给另一个组件成为输入
  3. 通过组合“解决好一件事的小工具”来解决复杂问题

在 Unix 世界中,让所有命令行具有可组合性的统一抽象是——文件和管道,在 MapReduce 中,这个抽象是分布式文件系统。之后我们注意到,数据流工具通过增加各自的“类管道”的数据传输方式,避免了将中间结果物化到分布式文件系统中的额外损耗,但最外侧的输入和输出仍然是在 HDFS 上。

分布式处理框架最主要解决的两个问题是:

  • 分片
    在 MapReduce 中,会根据输入数据的文件块(file chunk)的数量来调度 mappers。mappers 的输出会在二次分片、排序、合并(我们通常称之为 shuffle)到用户指定数量的 Reducer 中。该过程是为了将所有相关的数据(如具有相同 key)集结到一块。
    后 MapReduce 时代的数据流工具会尽量避免不必要的排序(因为代价太高了),但他们仍然使用了和 MapReduce 类似的分区方式。
  • 容错
    MapReduce 通过频繁的(每次 MapReduce 后)刷盘,从而可以避免重启整个任务,而只重新运行相关子任务就可以从其故障中快速恢复过来。但在错误频率很低的情况下,这种频繁刷盘做法代价很高。数据流工具通过尽可能的减少中间状态的刷盘(当然,shuffle 之后还是要刷的),并将其尽可能的保存在内存中,但这意味着一旦出现故障就要从头重算。算子的确定性可以减少重算的数据范围(确定性能保证只需要算失败分区,并且结果和其他分区仍然一致)。

接下来我们讨论了几种基于 MapReduce 的 Join 算法,这些算法也常被用在各种数据流工具和 MPP 数据库里。他们很好的说明了基于数据分区的算法的工作原理:

  • Sort-merge joins
    分桶排序。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 Reducer 函数会将 join 结果进行输出。
  • Broadcast hash joins
    小表广播。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 Mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。
  • Partitioned hash joins
    分桶哈希。如果两个待 join 输入使用相同的方式进行分片(相同的 key、相同的哈希函数和分区数),则广播哈希算法可以在每个分区内单独应用。

分布式批处理引擎使用了受限的编程模型:回调函数需要是无状态的,且除了输出之外没有其他的副作用。在此设定下,框架可以向应用层屏蔽很多分布式系统的实现细节:当遇到宕机或者网络问题时,子任务可以安全的进行重试;失败任务的输出可以自由抛弃;如果有多个冗余计算过程都成功了,则只有其中一个可以作为输出对后面可见。

由于框架的存在,用户侧的批处理代码无需关心容错机制的实现细节:即使在物理上有大量错误重试的情况下,框架可以保证在逻辑上最终的输出和没有任何故障发生是一致的。这种可靠性语义保证(reliable semantics)通常远强于我们在在线服务中常见到的、将用户的请求写到数据库中的容错性。

批处理任务的基本特点是——读取输入,进行处理,产生输出的过程中,不会修改原数据。换句话说,输出是输入的衍生数据。其中一个重要特点是,输入数据是有界的(bounded):输入的大小是固定的、事先确定的(比如输入是包含一组日志的数据或者一个快照点的数据)。唯其有界,处理任务才能知道什么时候输入读取结束了、什么时候计算完成了。

但在下一章中,我们将会转到流处理(stream processing)上,其中,输入是无界的(unbounded)——你的任务面对的是不知道何时结束的无限数据流。在这种情况下,任何时刻都有可能有新的数据流入,任务会永不结束。我们之后可以看到,虽然批处理和流处理在某些方面有相似之处,但对于输入的无界假设,会在构建系统时对我们的设计产生诸多影响。


少侠,稍等~

这里有一份需要你的产品使用反馈的问卷,烦请花 2 分钟填写下问卷,帮助 NebulaGraph 改善周边工具使用体验~~ 谢谢你 🥺

Part.0
 推荐阅读
END

对图数据库 NebulaGraph 感兴趣?欢迎前往 GitHub ✨ 查看源码:https://github.com/vesoft-inc/nebula

想要一起提高文档的可读性么?一起来给『文档 nGQL 示例添加注释』吧~请瞄准那条让人费解的 nGQL 语句,留下你的讲解 (///▽///) 

点击下图了解活动详情 🥹

    

文章转载自NebulaGraph,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论