暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

spark内核源码深度剖析(八):Master资源调度算法原理剖析与源码分析

程序员雨衣 2019-10-18
227

Master资源调度

首先判断,master状态不是ALIVE的话,直接返回 也就是说,standby master是不会进行Application等资源调度的

首先调度Driver 只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而不会来注册driver,就更不可能让master来调度driver了

Application的调度机制 首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps 默认是spreadOutApps

通过spreadOutApps这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去 比如有20个cpu core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core

所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去

这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量少的worker上去 比如总共有10个worker,每个有10个core,Application总共要分配20个core

那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了

源码剖析

  1. private def schedule() {

  2. // 首先判断,master状态不是ALIVE的话,直接返回

  3. // 也就是说,standby master是不会进行Application等资源调度的

  4. if (state != RecoveryState.ALIVE) { return }


  5. // First schedule drivers, they take strict precedence over applications

  6. // Randomization helps balance drivers

  7. // Random.shuffle的原理,就是对传入的集合的元素进行随机的打乱

  8. // 取出了Workers中所有之前注册上来的worker,进行过滤,必须状态ALIVE的worker

  9. // 对状态为ALIVE的worker,调用Random.shuffle方法进行随机的打乱

  10. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

  11. // 拿到worker数量

  12. val numWorkersAlive = shuffledAliveWorkers.size

  13. var curPos = 0


  14. // 首先调度Driver

  15. // 只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而

  16. // 不会来注册driver,就更不可能让master来调度driver了


  17. // driver调度机制

  18. // 遍历waitingDrivers ArrayBuffer

  19. for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

  20. // We assign workers to each waiting driver in a round-robin fashion. For each driver, we

  21. // start from the last worker that was assigned a driver, and continue onwards until we have

  22. // explored all alive workers.

  23. var launched = false

  24. var numWorkersVisited = 0

  25. // while的条件 numWorkersVisited小于numWorkersAlive 只要还有活着的worker没有遍历到,就继续遍历

  26. // 而且当前这个driver还没有被启动,也就是launched为false

  27. while (numWorkersVisited < numWorkersAlive && !launched) {

  28. val worker = shuffledAliveWorkers(curPos)

  29. numWorkersVisited += 1

  30. // 如果当前这个worker的空闲内存量大于等于driver需要的内存

  31. // 并且worker的空闲cpu数量大于等于driver所需要的CPU数量

  32. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

  33. // 启动driver

  34. launchDriver(worker, driver)

  35. // 将driver从waitingDrivers队列中移除

  36. waitingDrivers -= driver

  37. // launched设置为true

  38. launched = true

  39. }

  40. // 将指针指向下一个worker

  41. curPos = (curPos + 1) % numWorkersAlive

  42. }

  43. }


  44. // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

  45. // in the queue, then the second app, etc.

  46. // Application的调度机制

  47. // 首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps

  48. // 默认是spreadOutApps

  49. if (spreadOutApps) {

  50. // Try to spread out each app among all the nodes, until it has all its cores

  51. // 首先,遍历waitingApps中的ApplicationInfo,并且过滤出Application还有需要调度的core的Application

  52. for (app <- waitingApps if app.coresLeft > 0) {

  53. // 从worker中过滤出状态为ALIVE的Worker

  54. // 再次过滤出可以被Application使用的Worker,Worker剩余内存数量大于等于Application的每一个Actor需要的内存数量,而且该Worker没有运行过该Application对应的Executor

  55. // 将Worker按照剩余cpu数量倒序排序

  56. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

  57. .filter(canUse(app, _)).sortBy(_.coresFree).reverse

  58. val numUsable = usableWorkers.length

  59. // 创建一个空数组,存储要分配给每个worker的cpu数量

  60. val assigned = new Array[Int](numUsable) // Number of cores to give on each node

  61. // 获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值

  62. var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

  63. // 通过这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去

  64. // 比如有20个cpu core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core

  65. // 所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core


  66. // while条件,只要 要分配的cpu,还未分配完,就继续循环

  67. var pos = 0

  68. while (toAssign > 0) {

  69. // 每一个Worker,如果空闲的cpu数量大于已经分配出去的cpu数量,也就是说worker还有可分配的cpu

  70. if (usableWorkers(pos).coresFree - assigned(pos) > 0) {

  71. // 将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了

  72. toAssign -= 1

  73. // 给这个worker分配的cpu数量,加1

  74. assigned(pos) += 1

  75. }

  76. // 指针移动到下一个worker

  77. pos = (pos + 1) % numUsable

  78. }

  79. // Now that we've decided how many cores to give on each node, let's actually give them

  80. // 给每个worker分配完Application要求的cpu core之后 遍历worker

  81. for (pos <- 0 until numUsable) {

  82. // 只要判断之前给这个worker分配到了core

  83. if (assigned(pos) > 0) {

  84. // 那么就在worker上启动Executor

  85. // 首先,在Application内部缓存结构中,添加Executor,并且创建ExecutorDesc对象,其中封装了,给这个Executor分配多少个cpu core

  86. // 这里,spark 1.3.0版本的Executor启动的内部机制

  87. // 在spark-submit脚本中,可以指定要多少个Executor,每个Executor需要多少个cpu,多少内存

  88. // 那么基于spreadOutApps机制,实际上,最终,Executor的实际数量,以及每个Executor的cpu,可能与配置是不一样的

  89. // 因为我们这里是基于总的cpu来分配的,就是说,比如要求3个Executor,每个要三个cpu,有9个worker,每个有1个cpu

  90. // 那么根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个Executor

  91. // 最后会启动9个Executor,每个Executor有一个cpu core

  92. val exec = app.addExecutor(usableWorkers(pos), assigned(pos))

  93. // 在worker上启动Executor

  94. launchExecutor(usableWorkers(pos), exec)

  95. // 将application的状态设置为RUNNING

  96. app.state = ApplicationState.RUNNING

  97. }

  98. }

  99. }

  100. } else {

  101. // Pack each app into as few nodes as possible until we've assigned all its cores

  102. // 非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去

  103. // 这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量少的worker上去

  104. // 比如总共有10个worker,每个有10个core,Application总共要分配20个core

  105. // 那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了


  106. // 遍历worker,并且状态为ALIVE。还有空闲空间的worker

  107. for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {

  108. // 遍历application,并且是还有需要分配的core的application

  109. for (app <- waitingApps if app.coresLeft > 0) {

  110. // 判断,如果当前这个worker可以被application使用

  111. if (canUse(app, worker)) {

  112. // 取worker剩余cpu数量,与application要分配的cpu数量的最小值

  113. val coresToUse = math.min(worker.coresFree, app.coresLeft)

  114. // 如果worker剩余cpu为0,那么就不分配了

  115. if (coresToUse > 0) {

  116. // 给application添加一个executor

  117. val exec = app.addExecutor(worker, coresToUse)

  118. // 在worker上启动executor

  119. launchExecutor(worker, exec)

  120. // 将application的状态设置为RUNNING

  121. app.state = ApplicationState.RUNNING

  122. }

  123. }

  124. }

  125. }

  126. }

  127. }

看看launchDriver()方法

  1. // 在某个Worker上,启动driver

  2. def launchDriver(worker: WorkerInfo, driver: DriverInfo) {

  3. logInfo("Launching driver " + driver.id + " on worker " + worker.id)

  4. // 将driver加入worker内存缓存结构

  5. // 将worker内使用的内存和cpu数量,都加上driver需要内存和cpu数量

  6. worker.addDriver(driver)

  7. // 同时把worker也加入到driver内部的缓存结构中

  8. driver.worker = Some(worker)

  9. // 调用worker的actor,给他发送LaunchDriver消息,让worker来启动Driver

  10. worker.actor ! LaunchDriver(driver.id, driver.desc)

  11. // 将driver的状态设置为RUNNING

  12. driver.state = DriverState.RUNNING

  13. }

看看launchExecutor()方法

  1. def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {

  2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

  3. // 将Executor加入worker内部的缓存

  4. worker.addExecutor(exec)

  5. // 向worker的actor发送LaunchExecutor消息

  6. worker.actor ! LaunchExecutor(masterUrl,

  7. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

  8. // 向Executor对应的application对应的driver,发送ExecutorAdded消息

  9. exec.application.driver ! ExecutorAdded(

  10. exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

  11. }

看看canUse()方法

  1. def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {

  2. worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)

  3. }


文章转载自程序员雨衣,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论