7.2.5 Shuffle读写数据的源码解析
1. Shuffle写数据的源码解析
从SparkShuffle的整体框架中可以看到,在ShuffleManager提供了Shuffle相关数据块的写入与读取,即,对应的接口getWriter与getReader。
在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependency的RDD,查看执行过程中,Shuffle框架中的数据读写接口getWriter与getReader如何使用,通过这种具体案例的方式来加深对源码的理解。
Spark中具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析,可以知道Spark中一个作业可以根据宽依赖切分Stages,而在Stages中,相应的Tasks也包含两种,即ResultTask与ShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器,将一个RDD的元素拆分到多个buckets中,此时通过ShuffleManager的getWriter接口来获取数据与buckets的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDD的compute对内部数据进行计算,而在带有ShuffleDependency的RDD中,在compute计算时,会通过ShuffleManager的getReader接口,获取上一个Stage的Shuffle输出结果作为本次Task的输入数据。
首先查看ShuffleMapTask中的数据写流程,具体代码如下所示:
1. override def runTask(context: TaskContext): MapStatus = { 2. …… 3. // 首先从SparkEnv获取ShuffleManager。 4. // 然后从ShuffleDependency中获取注册到ShuffleManager时所得到的shuffleHandle。 5. // 根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter 6. // 最后根据获取的ShuffleWriter,调用其write接口,写入当前分区的数据。 7. var writer: ShuffleWriter[Any, Any] = null 8. try { 9. val manager = SparkEnv.get.shuffleManager 10. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 11. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 12. writer.stop(success = true).get 13. } catch { 14. …… 15. } 16. } |
对应具体的Shuffle读数据的实现机制,现有支持的三种方式在细节上都有所差异,具体源码解析可以参考后续针对这几种方式的各章节。
1. Shuffle读数据的源码解析
对应的数据读取器,从RDD的5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法,下面以包含宽依赖的RDD,CoGroupedRDD为例,查看如何获取Shuffle的数据。具体代码如下所示:
1. // 对指定分区进行计算的抽象接口,以下为CoGroupedRDD具体子类中该方法的实现 2. override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { 3. val split = s.asInstanceOf[CoGroupPartition] 4. val numRdds = dependencies.length 5. 6. // A list of (rdd iterator, dependency number) pairs 7. val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] 8. for ((dep, depNum) <- dependencies.zipWithIndex) dep match { 9. case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => 10. val dependencyPartition = split.narrowDeps(depNum).get.split 11. // Read them from the parent 12. val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) 13. rddIterators += ((it, depNum)) 14. 15. case shuffleDependency: ShuffleDependency[_, _, _] => 17. // 首先从SparkEnv获取ShuffleManager。 18. // 然后从ShuffleDependency中获取注册到ShuffleManager时 19. //所得到的shuffleHandle。根据shuffleHandle和当前Task对应的分区ID, 20. // 获取ShuffleWriter。 21. // 最后根据获取的ShuffleReader,调用其read接口,读取Shuffle的Map输出。 16. val it = SparkEnv.get.shuffleManager 17. .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) 18. .read() 19. rddIterators += ((it, depNum)) 20. } 21. 22. val map = createExternalMap(numRdds) 23. for ((it, depNum) <- rddIterators) { 24. map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) 25. } 26. context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) 27. context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) 28. context.internalMetricsToAccumulators( 29. InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes) 30. new InterruptibleIterator(context, 31. map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) 32. } |
从代码中可以看到,带宽依赖的RDD的compute操作中,最终是通过SparkEnv中的ShuffleManager实例的getReader方法,获取数据的读取器的,然后再次调用读取器的read读取指定分区范围的Shuffle数据。
注意,是带宽依赖的RDD,而非ShuffleRDD,除了ShuffleRDD之外,还有其他RDD也可以带上宽依赖的,比如前面给出的CoGroupedRDD。
目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的,从源码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类,只有BlockStoreShuffleReader,因此本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。
源码解析的第一步仍然是查看该类的描述信息,具体如下所示:
1. /** 2. * 通过从其他节点上请求读取Shuffle数据来接收并读取指定范围[起始分区, 结束分区) ——对应为左闭右开区间。 3. * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by 4. * requesting them from other nodes' block stores. 5. */ |
从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此该方法便是需要重点关注的源码,一些关键的代码如下所示:
1. /** 为该Reduce任务读取并合并key-values 值。 2. * Read the combined key-values for this reduce task */ 3. override def read(): Iterator[Product2[K, C]] = { 4. // 真正的数据Iterator读取是通过ShuffleBlockFetcherIterator来完成的。 5. val blockFetcherItr = new ShuffleBlockFetcherIterator( 6. context, 7. blockManager.shuffleClient, 8. blockManager, 9. // 可以看到,当ShuffleMapTask完成后注册到mapOutputTracker的元数据信息 10. // 同样会通过mapOutputTracker来获取,在此同时还指定了获取的分区范围 11. // 通过该方法的返回值类型, 12. mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), 13. // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility 14. // 默认读取时的数据大小限制为48m,对应后续并行的读取, 15. // 都是一种数据读取的控制策略,一方面可以避免目标机器占用过多带宽, 16. // 同时也可以启动并行机制,加快读取速度。 17. SparkEnv.get.conf.getSizeAsMb("spark.Reduce.maxSizeInFlight", "48m") * 1024 * 1024) 18. 19. // Wrap the streams for compression based on configuration 20. // 在此针对前面获取的各个数据块唯一标识ID信息极其对应的输入流进行处理 21. val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => 22. blockManager.wrapForCompression(blockId, inputStream) 23. } 24. …… 25. // 对读取到的数据进行聚合处理 26. val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { 27. // 如果在Map端已经做了聚合的优化操作,则对读取到的聚合结果进行聚合, 28. // 注意此时的聚合的操作与数据类型和Map端未做优化的时候是不同的。 29. if (dep.mapSideCombine) { 30. // We are reading values that are already combined 31. val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] 32. // 针对Map端各分区针对Key进行合并后的结果再次聚合, 33. // Map的合并可以大大减少网络传输的数据量 34. dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) 35. } else { 36. // We don't know the value type, but also don't care -- the dependency *should* 37. // have made sure its compatible w/ this aggregator, which will convert the value 38. // type to the combined type C 39. val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] 40. // 针对未合并的keyValues的值进行聚合 41. dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) 42. } 43. } else { 44. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 45. interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] 46. } 47. 48. // 在基于Sort的Shuffle实现过程中,默认仅仅是基于PartitionId进行排序, 49. // 在分区的内部数据是没有排序的,因此添加了keyOrdering变量, 50. // 提供是否需要针对分区内的数据进行排序的标识信息 51. // Sort the output if there is a sort ordering defined. 52. dep.keyOrdering match { 53. case Some(keyOrd: Ordering[K]) => 54. // 为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序 55. // 当内存不足以容纳排序的数据量时,会根据配置的spark.shuffle.spill属性 56. // 来决定是否需要spill到磁盘中,默认情况下会打开spill开关, 57. // 不打开的话在数据量比较大时会引发内存溢出问题(Out of Memory,OOM) 58. // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, 59. // the ExternalSorter won't spill to disk. 60. val sorter = 61. new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser)) 62. …… 63. case None => 64. // 不需要排序分区内部数据时直接返回 65. aggregatedIter 66. } 67. } |
下面进一步解析数据读取的部分细节,首先是数据块获取、读取的ShuffleBlockFetcherIterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地节点或远程节点)分别进行读取,其中关键代码如下所示:
1. private[this] def initialize(): Unit = { 2. …… 3. // 本地与远程的数据读取方式不同,因此先进行拆分, 4. // 注意拆分时会考虑一次获取的数据大小(拆分时会同时考虑并行数)封装请求, 5. // 最后会将剩余不足该大小的数据获取也封装为一个请求 6. // Split local and remote blocks. 7. val remoteRequests = splitLocalRemoteBlocks() 8. // Add the remote requests into our queue in a random order 9. // 存入需要远程读取的数据块请求信息 10. fetchRequests ++= Utils.randomize(remoteRequests) 11. 12. 13. // Send out initial requests for blocks, up to our maxBytesInFlight 14. // 发送数据获取请求 15. fetchUpToMaxBytes() 16. …… 17. // 除了远程数据获取之外,下面是获取本地数据块的方法调用 18. // Get Local Blocks 19. fetchLocalBlocks() 20. …… 21. } |
与Hadoop一样,Spark计算框架也是基于数据本地性,即移动数据而非计算的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。
另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供,可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId, Seq[(BlockId, Long)])],其中BlockManagerId是BlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId,Long)])表示一组数据块标识ID及其数据块大小的元组信息。
最后简单分析下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量,下面以基于排序的OrderedRDDFunctions提供的sortByKey方法给出解析,具体代码如下所示:
1. def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) 2. : RDD[(K, V)] = self.withScope 3. { 4. // 注意,这里设置了该方法构建的RDD所使用的分区器 5. // 根据Range而非Hash进行分区,对应的Range信息需要计算并将结果 6. // 反馈到Driver端,因此对应调用RDD中的Action,即会触发一个Job的执行 7. val part = new RangePartitioner(numPartitions, self, ascending) 8. // 在构建RDD实例之后,设置Key的排序算法,即Ordering实例 9. new ShuffledRDD[K, V, V](self, part) 10. .setKeyOrdering(if (ascending) ordering else ordering.reverse) 11. } |
当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read方法中的第52行代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。




