暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

spark内核源码深度剖析(十四):Shuffle原理剖析与源码分析

程序员雨衣 2019-11-01
348

在Spark中,什么情况下,会发生shuffle?reduceByKey、groupByKey、sortByKey、countByKey、join、cogroup等操作

Spark Shuffle操作的两个特点

第一个特点 在Spark早期版本中,那个bucket缓存是非常非常重要的,因为需要将一个ShuffleMapTask所有的数据都写入内存缓存之后,才会刷新到磁盘。但是这就有一个问题,如果map side数据过多,那么很容易造成内存溢出。所以spark在新版本中,优化了,默认那个内存缓存是100kb,然后呢,写入一点数据达到了刷新到磁盘的阈值之后,就会将数据一点一点地刷新到磁盘。这种操作的优点,是不容易发生内存溢出。缺点在于,如果内存缓存过小的话,那么可能发生过多的磁盘写io操作。所以,这里的内存缓存大小,是可以根据实际的业务情况进行优化的。第二个特点, 与MapReduce完全不一样的是,MapReduce它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。但是Spark不需要,spark默认情况下,是不会对数据进行排序的。因此ShuffleMapTask每写入一点数据,ResultTask就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。spark这种机制的好处在于,速度比mapreduce快多了。但是也有一个问题,mapreduce提供的reduce,是可以处理每个key对应的value上的,很方便。但是spark中,由于这种实时拉取的机制,因此提供不了,直接处理key对应的values的算子,只能通过groupByKey,先shuffle,有一个MapPartitionsRDD,然后用map算子,来处理每个key对应的values。就没有mapreduce的计算模型那么方便。

普通的Shuffle操作原理剖析

每个ShuffleMapTask,都会为每个ResultTask创建一份bucket缓存,以及对应的ShuffleBlockFile磁盘文件

所以假设,如果有100个ShuffleMapTask,100个ResultTask,本地磁盘要产生10000个文件,磁盘IO过多,影响性能

ShuffleMapTask的输出,会作为MapStatus,发送到DAGScheduler的MapOutputTrackerMaster中,MapStatus包含了每个ResultTask要拉取的数据大大小

每个ResultTask会用BlockStoreShuffleFetcher去MapOutputTrackerMaster获取自己的要拉取的文件的信息,然后底层通过BlockManager将数据拉取过来

Map端的数据,可以理解为Shuffle的第一个RDD,MapPartitionsRDD 每个ResultTask拉取过来的数据,其实会组成一个内部的RDD,叫ShuffledRDD,优先放入内存,如果内存不够,那么写入磁盘

然后每个ResultTask针对数据进行聚合,最后生成MapPartitionsRDD,就是我们执行reduceByKey等操作希望获得的那个RDD

优化后的Shuffle原理剖析

Spark新版本中,引入了consolidation机制,也就说,提出了ShuffleGroup概念

一个ShuffleMapTask将数据写入ResultTask数量的本地文件,这个不会变,但是,当下一个ShuffleMapTask运行的时候,可以直接将数据写入之前的ShuffleMapTask的本地文件,相当于是,对多个ShuffleMapTask的输出进行了合并,从而大大减少了本地磁盘的数量

一组ShuffleGroup,每个文件中,都存储了多个ShuffleMapTask的数据,每个ShuffleMapTask的数据,叫做一个segment。此外,会通过一些索引,来标记每个ShuffleMapTask的输出在ShuffleBlockFile中的索引,以及偏移量等,来进行不同ShuffleMapTask的数据的区分

机器上,有两个cpu,也就说,4个ShuffleMapTask,有2个ShuffleMapTask是可以并行执行的,先执行的是黑色边框的两个ShuffleMapTask,再执行红色边框的两个ShuffleMapTask 并行执行的ShuffleMapTask,写入的文件,一定是不同的,当一批并行执行的ShuffleMapTask运行完之后,那么新的一批ShuffleMapTask启动起来并行执行的时候,优化机制就开始发挥作用了,开启优化Shuffle操作,主要是通过在SparkConf中,设置一个参数就可以 每一个节点上面,比如有2个cpu,100个ShuffleMapTask,那么此时,就会产生100*100个磁盘文件,就是10000个 开启了consolidation机制之后的Shuffle write ,每个节点上的磁盘文件,数量变成了cpu core的数量 * ShuffleMapTask的数量,同样的情况,每个节点只产生200个磁盘文件

源码

Shuffle写

入口

  1. // 这个writer默认是HashShuffleWriter

  2. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

看HashShuffleWriter的 writer
方法

  1. // 将每个ShuffleMapTask计算出来的新的rdd的partition数据,写入本地磁盘

  2. override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {

  3. // 首先判断,是否需要map端本地进行聚合,这里的话,如果是reduceByKey这种操作,它的dep.aggregator.isDefined、dep.mapSideCombine都是true

  4. // 那么就会进行map端的本地聚合

  5. val iter = if (dep.aggregator.isDefined) {

  6. if (dep.mapSideCombine) {

  7. // 这里就会执行本地聚合,比如本地有(hello,1)、(hello,1),那么此时就会聚合成(hello,2)

  8. dep.aggregator.get.combineValuesByKey(records, context)

  9. } else {

  10. records

  11. }

  12. } else {

  13. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

  14. records

  15. }


  16. // 需要本地聚合,那么先本地聚合,然后遍历数据,对每个数据,调用partitioner,默认是HashPartitioner

  17. // 生成bucketId,也就是决定了,每一份数据,要写入哪个bucket中

  18. for (elem <- iter) {

  19. val bucketId = dep.partitioner.getPartition(elem._1)

  20. // 获取到了bucketId之后,会调用shuffleBlockManager.forMapTask()方法,来生成bucketId对应的writer,然后用writer将数据写入bucket

  21. shuffle.writers(bucketId).write(elem)

  22. }

  23. }

上面代码的shuffle其实是

  1. private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,

  2. writeMetrics)

看看 forMapTask()
方法

  1. // 给每一个mao task 获取一个ShuffleWriterGroup

  2. def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,

  3. writeMetrics: ShuffleWriteMetrics) = {

  4. new ShuffleWriterGroup {

  5. shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))

  6. private val shuffleState = shuffleStates(shuffleId)

  7. private var fileGroup: ShuffleFileGroup = null


  8. // 对应之前讲解的,shuffle有两种模式,一种是普通的,一种是优化后的

  9. // 这里会判断,如果开启了consolidation机制,也就是consolidateShuffleFiles为true的话,那么实际上,不会给每个bucket都获取一个独立的文件

  10. // 而是为这个bucket获取一个ShuffleGroup的writer

  11. val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {

  12. fileGroup = getUnusedFileGroup()

  13. Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>

  14. // 首先,用shuffleId、mapId、bucketId(也就是reduceId,一个bucket对应一个reduce)生成一个唯一的ShuffleBlockId

  15. // 然后用buckId,来调用ShuffleFileGroup的apply()函数,为bucket获取一个ShuffleFileGroup

  16. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

  17. // 然后调用BlockManager的getDiskWriter()方法,针对ShuffleFileGroup获取一个writer

  18. // 这里,就明白了,如果开启了consolidation机制,实际上,对于每一个bucket,都会获取一个针对ShuffleFileGroup的wtriter,而不是一个独立的ShuffleBlockFile的writer

  19. // 这样就实现了所谓的,多个ShuffleMapTask的输出数据的合并

  20. blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,

  21. writeMetrics)

  22. }

  23. } else {

  24. // 如果没有开启consolidation机制,也就是普通的shuffle操作的话

  25. Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>

  26. // 同样生成一个ShuffleBlockId

  27. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

  28. // 然后调用BlockManager的diskBlockManager,获取一个代表了要写入本地磁盘文件的blockFile

  29. val blockFile = blockManager.diskBlockManager.getFile(blockId)

  30. // Because of previous failures, the shuffle file may already exist on this machine.

  31. // If so, remove it.

  32. // 而且会判断,如果blockFile要是存在的话,还得删除它

  33. if (blockFile.exists) {

  34. if (blockFile.delete()) {

  35. logInfo(s"Removed existing shuffle file $blockFile")

  36. } else {

  37. logWarning(s"Failed to remove existing shuffle file $blockFile")

  38. }

  39. }

  40. // 然后调用BlockManager的getDiskWriter()方法,针对那个blockFile生成writer

  41. blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)

  42. }

  43. // 所以使用这种普通的shuffle操作的话,对于每一个ShuffleMapTask输出的bucket,都会在本地获取一个单独的ShuffleBlockFile

  44. }

shuffle读

入口

  1. override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {

  2. // ResultTask或者ShuffleMapTask在执行到ShuffleRDD时,肯定会调用ShuffleRDD的computer()方法,来计算当前这个RDD的partition的数据

  3. // 在这里,会调用ShuffleManager的getReader()方法,获取一个HashShuffleReader,然后调用它的read()方法,拉取该ResultTask ShuffleMapTask需要聚合的数据

  4. val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

  5. SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)

  6. .read()

  7. .asInstanceOf[Iterator[(K, C)]]

  8. }

看看HashShuffleReader的 read()
方法

  1. /** Read the combined key-values for this reduce task */

  2. override def read(): Iterator[Product2[K, C]] = {

  3. val ser = Serializer.getSerializer(dep.serializer)

  4. // reduceTask在拉取数据时,其实会用BlockStoreShuffleFetcher来从DAGDcheduler的MapOutputTrackerMaster中获取自己想要的数据的信息

  5. // 然后底层,再通过blockManager从对应的位置,拉取需要的数据

  6. val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)


  7. val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {

  8. if (dep.mapSideCombine) {

  9. new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))

  10. } else {

  11. new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))

  12. }

  13. } else {

  14. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")


  15. // Convert the Product2s to pairs since this is what downstream RDDs currently expect

  16. iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))

  17. }


  18. // Sort the output if there is a sort ordering defined.

  19. dep.keyOrdering match {

  20. case Some(keyOrd: Ordering[K]) =>

  21. // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,

  22. // the ExternalSorter won't spill to disk.

  23. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))

  24. sorter.insertAll(aggregatedIter)

  25. context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)

  26. context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)

  27. sorter.iterator

  28. case None =>

  29. aggregatedIter

  30. }

  31. }

看看 BlockStoreShuffleFetcher.fetch()
方法

  1. private[hash] object BlockStoreShuffleFetcher extends Logging {

  2. def fetch[T](

  3. shuffleId: Int,

  4. reduceId: Int,

  5. context: TaskContext,

  6. serializer: Serializer)

  7. : Iterator[T] =

  8. {

  9. logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))

  10. // 拿到全局的blockManager

  11. val blockManager = SparkEnv.get.blockManager


  12. val startTime = System.currentTimeMillis

  13. // 拿到一个全局的MapOutputTracker的引用,调用其getServerStatuses()方法,传入了shuffleId和reduceId

  14. // shuffleId可以代表当前这个stage的上一个stage,shuffle是分为两个stage的,shuffle writer发生在上一个stage中,shuffle read 是发生在当前stage中的

  15. // 首先通过shuffleId可以限制到上一个stage的所有ShuffleMapTask的输出的MapStatus,接着,通过reduceId,也就是所谓的bucketId,来限制,从每个mapTask中,获取当前这个resultTask需要获取的每个ShuffleMapTask的输出文件的信息

  16. // getServerStatuses()方法,一定是走远程网络通信的,因为要联系Driver上的DAGScheduler的MapOutputTrackerMaster

  17. val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

  18. logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(

  19. shuffleId, reduceId, System.currentTimeMillis - startTime))


  20. // 对刚才拉取的数据,进行一些数据结构上的转换操作

  21. val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]

  22. for (((address, size), index) <- statuses.zipWithIndex) {

  23. splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))

  24. }


  25. val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {

  26. case (address, splits) =>

  27. (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))

  28. }


  29. def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {

  30. val blockId = blockPair._1

  31. val blockOption = blockPair._2

  32. blockOption match {

  33. case Success(block) => {

  34. block.asInstanceOf[Iterator[T]]

  35. }

  36. case Failure(e) => {

  37. blockId match {

  38. case ShuffleBlockId(shufId, mapId, _) =>

  39. val address = statuses(mapId.toInt)._1

  40. throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)

  41. case _ =>

  42. throw new SparkException(

  43. "Failed to get block " + blockId + ", which is not a shuffle block", e)

  44. }

  45. }

  46. }

  47. }

  48. // ShuffleBlockFetcherIterator构造以后,在其内部,就直接根据拉取到的地理位置信息,通过BlockManager去远程ShuffleMapTask所在的节点的BlockManager去拉取数据

  49. val blockFetcherItr = new ShuffleBlockFetcherIterator(

  50. context,

  51. SparkEnv.get.blockManager.shuffleClient,

  52. blockManager,

  53. blocksByAddress,

  54. serializer,

  55. SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)

  56. val itr = blockFetcherItr.flatMap(unpackBlock)


  57. // 最后,将拉取到的数据,执行一些转换和封装,返回

  58. val completionIter = CompletionIterator[T, Iterator[T]](itr, {

  59. context.taskMetrics.updateShuffleReadMetrics()

  60. })


  61. new InterruptibleIterator[T](context, completionIter) {

  62. val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()

  63. override def next(): T = {

  64. readMetrics.incRecordsRead(1)

  65. delegate.next()

  66. }

  67. }

  68. }

  69. }


文章转载自程序员雨衣,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论