暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(1.6 版本)系列:钨丝计划(Project Tungsten)之内存管理的模型及其源码的解析(二)

偷功 2016-12-02
520

2. MemoryManager的实现及其源码解析

MemoryManager目前实现了两种具体的内存管理模型,从Spark 1.6 版本开始,默认使用统一内存管理模型,对应的配置属性为"spark.memory.useLegacyMode",控制代码位于SparkEnv类中,代码如下所示:

1.         // Spark 1.5 及之前的版本所使用的内存管理模型对应配置属性

2.         // "spark.memory.useLegacyMode"ture。当前默认为false

3.          val useLegacyMemoryManager =  conf.getBoolean("spark.memory.useLegacyMode", false)

4.             val memoryManager: MemoryManager =

5.               if (useLegacyMemoryManager) {

6.                 // 使用静态内存管理模型

7.                 new StaticMemoryManager(conf,  numUsableCores)

8.               } else {

9.         // 使用统一内存管理模型

10.              UnifiedMemoryManager(conf,  numUsableCores)

11.            }

以上是具体内存管理模型选择的代码,下面开始分析内存管理相关的源码,首先查看MemoryManager的注释,如下所示:

1.         /**

2.          * An abstract memory manager that enforces  how memory is shared between execution and

3.          * storage.

4.          * In this context, execution memory refers  to that used for computation in shuffles, joins,

5.          * sorts and aggregations, while storage  memory refers to that used for caching and

6.          * propagating internal data across the  cluster. There exists one MemoryManager per JVM.

7.          *

8.          * 内存管理的抽象接口,用于指定如何在 Execution Storage 间共享内存。

9.          * Execution Memory是指用计算的内容,包括Shufflesjoinssorts以及aggregations

10.       * Storage Memory则是指用于缓存或内部数据传输过程中所使用的内存。

11.       * MemoryManagerJVM进程的对应关系为1:1。即一个JVM进程中的内存由一个

12.       *MemoryManager进行管理。

13.       */

14.      private[spark] abstract class  MemoryManager(

15.      ……

MemoryManager类中提供的内存分配与释放的几个主要接口如下:

  • Storage部分内存的分配与释放接口:acquireStorageMemoryacquireUnrollMemoryreleaseStorageMemory以及releaseUnrollMemory

  • Execution部分内存的分配与释放接口:acquireExecutionMemoryreleaseExecutionMemory

具体分配与释放的实现由MemoryManager的具体子类提供。

两大实现子类的主要差别在于StorageExecution内存之间的边界是静态的还是动态可变的,下面分别简单描述下两大子类的实现细节。

StaticMemoryManager类的注释如下所示:

1.         /**

2.          * A [[MemoryManager]] that statically  partitions the heap space into disjoint regions.

3.          *

4.          * The sizes of the execution and storage  regions are determined through

5.          * `spark.shuffle.memoryFraction` and  `spark.storage.memoryFraction` respectively. The two

6.          * regions are cleanly separated such that  neither usage can borrow memory from the other.

7.         *静态划分StorageExecution内存之间的边界的一种内存管理实现。

8.          * StorageExecution内存大小分别由配置属性`spark.shuffle.memoryFraction`

9.         *`spark.storage.memoryFraction`各自指定,由于是静态划分边界,因此这两者之间不能

10.       *互相借用多余的内存。

11.       */

12.      private[spark] class  StaticMemoryManager(

静态内存管理模型中各部分内存的分配可以通过以下几个接口或成员变量查看:

  • maxUnrollMemoryunroll过程中可用的内存,占最大可用Storage内存的0.2

  • getMaxStorageMemory:获取分配给Storage使用的最大内存大小。

  • getMaxExecutionMemory:获取分配给Execution使用的最大内存大小。

其中,getMaxStorageMemory对应用于Storage的最大内存,具体配置如下所示:

1.           private def getMaxStorageMemory(conf:  SparkConf): Long = {

2.             val systemMaxMemory =  conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

3.             val memoryFraction =  conf.getDouble("spark.storage.memoryFraction", 0.6)

4.             val safetyFraction =  conf.getDouble("spark.storage.safetyFraction", 0.9)

5.             (systemMaxMemory * memoryFraction *  safetyFraction).toLong

6.           }

其中配置属性"spark.storage.memoryFraction"表示Storage内存占用全部内存(除预留给系统的内存外)的占比,"spark.storage.safetyFraction"对应为Storage内存的安全系数。

对应的getMaxExecutionMemory方法指明了用于Execution内存的相关配置属性,与Storage内存一样包含占总内存的占比(0.2)及对应的安全系数。

另外,除了Storage内存与Execution内存占用的0.6+0.2之外的剩余内存,作为系统预留内存。

通过StaticMemoryManager类简单分析静态内存管理模型后,继续查看统一内存管理模型,首先查看其类注释:

1.         /**

2.          * A [[MemoryManager]] that enforces a soft  boundary between execution and storage such

3.          * that either side can borrow memory from  the other.

4.          *

5.          * The region shared between execution and  storage is a fraction of (the total heap space -

6.          * 300MB) configurable through  `spark.memory.fraction` (default 0.75). The position of the

7.          * boundary within this space is further  determined by `spark.memory.storageFraction`

8.          * (default 0.5).This means the size of the  storage region is 0.75 * 0.5 = 0.375 of the heap

9.          * space by default.

10.       * Storage can borrow as much execution  memory as is free until execution reclaims its space.

11.       * When this happens, cached blocks will be  evicted from memory until sufficient borrowed

12.       * memory is released to satisfy the  execution memory request.

13.       *

14.       * Similarly, execution can borrow as much  storage memory as is free. However, execution

15.       * memory is *never* evicted by storage due  to the complexities involved in implementing

16.       * T this.he implication is that attempts to  cache blocks may fail if execution has already eaten

17.       * up most of the storage space, in which  case the new blocks will be evicted immediately

18.       * according to their respective storage  levels.

19.       *

20.       * @param storageRegionSize Size of the  storage region, in bytes.

21.       *                          This region is not  statically reserved; execution can borrow

22.       *                          from it if  necessary. Cached blocks can be evicted only if

23.       *                          actual storage  memory usage exceeds this region.

24.       *

25.       * UnifiedMemoryManager :是 MemoryManager 的一个具体子类,实现 Storage

26.      *Execution 间软边界(即动态边界)的内存管理模式。

27.       *    动态边界意味着 Storage Execution 的内存是可以相互借用的。

28.       * Storage Execution  :共享的内存通过配置 `spark.memory.fraction`  进行设置。

29.       *     两者间的内存分配通过配置  `spark.memory.storageFraction` 进行设置。

30.       * 内存借用:

31.       *  1 Storage 可以借用 Execution 中空闲的内存,但在 Execution 执行需要内存时

32.      会被回收(回收缓存内存直到满足执行申请的内存)。

33.       *  2)类似地,Execution 也可以借用 Storage 中空闲的内存。但是,出于实现的复

34.       *  杂性考虑(具体可以参考前面给出的官方设计文档

35.      *unified-memory-management-spark-10000.pdf),

36.       *       借用的内存`永远`不会因为 Storage 需要而进行回收。

37.       */

38.      private[spark] class  UnifiedMemoryManager private[memory] (

39.      ……

UnifiedMemoryManagerStaticMemoryManager一样实现了MemoryManager的几个内存分配、释放的接口,对应分配与释放接口的实现,在StaticMemoryManager相对比较简单,而在UnifiedMemoryManager中,由于考虑到动态借用的情况,实现相对比较复杂,具体细节可以参考官方提供的统一内存管理设计文档以及相关源码,比如针对各个Task如何保证其最小分配的内存(最少1/2N,其中N表示当前活动状态的Task个数,最大的Task个数可以从Executor分配的内核个数/每个Task占用的内核个数得到)等等。

下面简单分析下统一内存管理模型中,Storage内存与Execution内存等相关的配置。

查看方法,具体代码如下所示:

1.           /**

2.            * Return the total amount of memory shared  between execution and storage, in bytes.

3.         *返回ExecutionStorage共享的最大内存。

4.            */

5.           private def getMaxMemory(conf: SparkConf):  Long = {

6.             val systemMemory =  conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

7.         // 系统预留的内存大小,默认为300M

8.         // 结合下面最小系统内存的限定条件(硬编码方式,因此修改需要重新编译),

9.             // 可以人为修改该配置进行测试。

10.          val reservedMemory =  conf.getLong("spark.testing.reservedMemory",

11.            if  (conf.contains("spark.testing")) 0 else  RESERVED_SYSTEM_MEMORY_BYTES)

12.       // 当前最小的内存需要300 * 1.5,即450M,不满足该条件时会报错退出

13.       val minSystemMemory = reservedMemory * 1.5

14.          if (systemMemory < minSystemMemory) {

15.            throw new  IllegalArgumentException(s"System memory $systemMemory must " +

16.              s"be at least $minSystemMemory.  Please use a larger heap size.")

17.          }

18.          val usableMemory = systemMemory –  reservedMemory

19.          // 当前ExecutionStorage共享的最大内存占比默认为0.75,即

20.          // ExecutionStorage内存为可用内存的0.75

21.          // 用户内存为可用内存的(1-0.75= 0.25

22.          // 由于前面提到的Alexey Grishchenko给出的博客上对统一内存管理模型给出了

23.      // 非常详细、深入地解析,建议直接参考博客内容,同时结合此处源码进行理解。

24.          val memoryFraction =  conf.getDouble("spark.memory.fraction", 0.75)

25.          (usableMemory * memoryFraction).toLong

26.        }

另外,虽然ExecutionStorage之间共享内存,但仍然存在一个初始边界值,参考伴生对象UnifiedMemoryManagerapply工厂方法,具体代码如下所示:

1.           def apply(conf: SparkConf, numCores: Int):  UnifiedMemoryManager = {

2.             val maxMemory = getMaxMemory(conf)

3.             new UnifiedMemoryManager(

4.               conf,

5.               maxMemory = maxMemory,

6.         // 通过配置属性"spark.memory.storageFraction",可以设置ExecutionStorage

7.               // 共享内存的初始边界值,即默认初始时,各占总的内存的一半。

8.               storageRegionSize =

9.                 (maxMemory *  conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,

10.            numCores = numCores)

11.        }

另外,需要注意的是,前面Execution内存指的是on-heap部分的内存,在ProjectTungsten中引入了off-heap(堆外)内存,这部分内存大小的设置在基类MemoryManager中,对应代码如下所示:

1.         // 根据传入的内存大小或配置属性分别设置内存池管理的内存初始大小

2.         // 1. Storage部分的内存池初始大小设置

3.         storageMemoryPool.incrementPoolSize(storageMemory)

4.         // 2. on-heap部分的Execution内存池初始大小设置

5.         onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

6.         // 3. 从配置属性中读取off-heap内存池的初始内存大小

7.         offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size",  0))

当需要使用off-heap内存时,需要注意的是,除了需要修改off-heap内存池(offHeapExecutionMemoryPool)的内存初始值时(默认为0),还需要打开对应的控制开关,具体代码参考内存分配MemoryManager中内存模式的设置(该内存模式可以控制用于内存分配MemoryAllocator的具体子类。),对应代码如下所示:

1.         final val tungstenMemoryMode:  MemoryMode = {

2.         // 当需要使用off-heap内存模式时,需要通过"spark.memory.offHeap.enabled"

3.             // 配置属性打开开关,然后通过"spark.memory.offHeap.size"配置属性

4.             // 指定off-heap的内存大小。

5.             if  (conf.getBoolean("spark.memory.offHeap.enabled", false)) {

6.               require(conf.getSizeAsBytes("spark.memory.offHeap.size",  0) > 0,

7.                 "spark.memory.offHeap.size must  be > 0 when spark.memory.offHeap.enabled == true")

8.               MemoryMode.OFF_HEAP

9.             } else {

10.            MemoryMode.ON_HEAP

11.          }

12.        }

从图3-4可以看出,Execution内存根据不同的内存模式(on-heapoff-heap)可以有两种内存池管理方式,对应可以查看下Execution内存分配的方法(方法注释中给出了为Task分配内存的实现细节,有兴趣可以查看源码注释),关键的代码如下所示:

1.           override private[memory] def  acquireExecutionMemory(

2.               numBytes: Long,

3.               taskAttemptId: Long,

4.               memoryMode: MemoryMode): Long =  synchronized {

5.             assert(onHeapExecutionMemoryPool.poolSize  + storageMemoryPool.poolSize == maxMemory)

6.             assert(numBytes >= 0)

7.             memoryMode match {

8.               case MemoryMode.ON_HEAP =>

9.         ……

10.              // 当内存模式为on-heap时,使用onHeapExecutionMemoryPool内存池来管理

11.              onHeapExecutionMemoryPool.acquireMemory(

12.                numBytes, taskAttemptId,  maybeGrowExecutionPool, computeMaxExecutionPoolSize)

13.       

14.            case MemoryMode.OFF_HEAP =>

15.              //当内存模式为off-heap时,使用offHeapExecutionMemoryPool内存池来管理

16.              offHeapExecutionMemoryPool.acquireMemory(numBytes,  taskAttemptId)

17.          }

18.        }

MemoryMode是二选一,因此在启动off-heap内存模式时,可以将Storage的内存占比(对应配置属性"spark.memory.storageFraction")设置高一点,虽然在具体分配过程中Storage也可以向on-heap这部分Execution借用内存。

关于内存池部分,下面给出主要类的类图,具体实现可以结合类图阅读源码来加深理解,主要类图如图3-5所示:


3-5内存池相关类图

主要是通过内部池大小和使用的内存大小等进行控制,对应统一内存管理模型,需要考虑借用等具体实现(关键代码可以查看UnitedMemoryManagerStorageMemoryPool类的shrinkPoolToFreeSpace方法的调用)。

以上是对Tungsten的两种内存管理模型的简单解析,下面开始对内存管理模型的内部组织结构进行解析。


文章转载自偷功,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论