暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark(1.6 版本)系列:钨丝计划(Project Tungsten)之内存管理的模型及其源码的解析(四)

偷功 2016-12-04
670

4. 内存管理模型中的内存组织、管理模式

Spark是一个技术框架,数据以分区粒度进行处理,即每个分区对应一个处理的任务(Task),因此内存的组织与管理等可以通过与Task一一对应的TaskMemoryManager来理解。

下面首先给出TaskMemoryManagerMemoryManager间的关系图,如图3-6所示:


3-6TaskMemoryManagerMemoryManager的关系图

在图3-6,各个MemoryConsumer是具体处理时需要使用(消耗)内存块的实体,MemoryConsumer通过TaskMemoryManager提供的接口向MemoryManager申请或释放内存资源,即申请或释放内存块,TaskMemoryManager类中会管理全部MemoryConsumer,并对这些内存消耗实体所申请的内存块进行组织与管理,具体是通过通过PageTable的方式来实现。

首先查看下类的注释信息,原注释信息比较多,在此仅给出简单的中文描述,其他相关信息可以在后续内容中补充,具体代码如下所示:

1.         /**

2.          * Manages the memory allocated by an  individual task.

3.          * ……

4.          * 管理为单个 Task 所分配的内存。

5.          * 内存地址在不同的内存模式下的表示:

6.          *  1.  off-heap :直接使用 64-bit  表示内存地址。

7.          *  2.  on-heap :通过 base  object 和该对象中 64-bit  的偏移量来表示。

8.          * 通过封装类 MemoryBlock 统一表示内存块信息:

9.          *  1.  off-heap MemoryBlock   base  object null,偏移量对应 64-bit 的绝对地址。

10.       *  2.  on-heap MemoryBlock   base  object 保存对象的引用(该引用可以由 page 的索引从pageTable获取),

11.       *     偏移量对应数据在该对象中的偏移量。

12.       *

13.       *  通过这两种内存模式对应的编码方式,最终对外提供的编码格式为  13bit-pageNumber + 51bit-offset

14.       */

15.      public class  TaskMemoryManager {

下面从三个方面对TaskMemoryManager进行解析,包含内存地址的编码与解码、PageTable的组织与管理,以及内存的分配与释放。

首先解析内存地址的编码与解码部分。从TaskMemoryManager类的注释部分可以知道,off-heapon-heap两种内存模式最终对外都是是采用一致的编码格式,即对应13bitpageNumber(页码)和 51bitoffset(偏移量),可以通过图3-7来描述对应的编码方式:


3-7 Page的编码方式

下面分别对TaskMemoryManager类中与编码与解码相关的几个接口进行解析,编码接口主要有两个,其源码与解析如下所示:

1.           /**

2.            * Given a memory page and offset within  that page, encode this address into a 64-bit long.

3.            * This address will remain valid as long  as the corresponding page has not been freed.

4.            *

5.            * @param page a data page allocated by  {@link TaskMemoryManager#allocatePage}/

6.            * @param offsetInPage an offset in this  page which incorporates the base offset. In other

7.            *   words,   this should be the  value that you would pass as the base offset into an

8.            *                      UNSAFE call (e.g.  page.baseOffset() + something).

9.            * @return an encoded page address.

10.         *

11.         * 将针对某个 Page 的地址进行编码:

12.         * on-heap offsetInPage 是针对 base object 的偏移量。

13.         * off-heap :此时, offsetInPage 是绝对地址,因此编码到 Page方式的地址时,

14.         *            需要将绝对地址转换为相对于已有的PageMemoryBlock)中的绝对

15.         *地址 offset 的相对地址。最后将得到的两个偏移量和 Page Number 一起组装到 13 + 51 bits 64 bit 中。

16.         */

17.        public long  encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {

18.          if (tungstenMemoryMode ==  MemoryMode.OFF_HEAP) {

19.            // 如果是off-heap,则对应的offsetInPage64bit的绝对地址,需要转换为Page

20.            // 编码能容纳的51bit编码中,因此此时需要将其转换为Page内的相对地址,

21.            // 即页内的偏移地址。

22.            // In off-heap mode, an offset is an  absolute address that may require a full 64 bits to

23.            // encode. Due to our page size  limitation, though, we can convert this into an offset that's

24.            // relative to the page's base offset;  this relative offset will fit in 51 bits.

25.            offsetInPage -= page.getBaseOffset();

26.          }

27.          return encodePageNumberAndOffset(page.pageNumber,  offsetInPage);

28.        }

29.       

30.        @VisibleForTesting

31.        public static long  encodePageNumberAndOffset(int pageNumber, long offsetInPage) {

32.          assert (pageNumber != -1) :  "encodePageNumberAndOffset called with invalid page";

33.      // 13bit的页码与51bit的页内偏移量组装成64bit的编码地址。

34.          return (((long) pageNumber) <<  OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);

35.        }

通过pageNumber可以找到最终的PagePage内部会根据off-heapon-heap两种模式分别存储Page对应内存块的起始地址(或对象内偏移地址),因此编码后的地址可以通过查找到Page,最终解码出原始地址。对应的的解码的源码及其解析如下所示:

1.           @VisibleForTesting

2.           public static int decodePageNumber(long  pagePlusOffsetAddress) {

3.            // 通过13bit掩码解析出编码地址中的页码信息,即对应的高13bit内容

4.             return (int) ((pagePlusOffsetAddress  & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS);

5.           }

6.          

7.           private static long decodeOffset(long  pagePlusOffsetAddress) {

8.           // 通过51bit掩码解析出编码地址中的页码信息,即对应的低51bit内容

9.             return (pagePlusOffsetAddress &  MASK_LONG_LOWER_51_BITS);

10.        }

TaskMemoryManager类中另外还提供了针对on-heap内存模式下,获取baseobject的接口,对应的源码及其解析如下所示:

1.           /**

2.            * Get the page associated with an address  encoded by

3.         *获取编码地址相关的base object

4.            * {@link  TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}

5.            */

6.           public Object getPage(long  pagePlusOffsetAddress) {

7.             if (tungstenMemoryMode ==  MemoryMode.ON_HEAP) {

8.               // 首先从地址中解析出页码

9.               final int pageNumber =  decodePageNumber(pagePlusOffsetAddress);

10.            assert (pageNumber >= 0 &&  pageNumber < PAGE_TABLE_SIZE);

11.            // 根据页码从pageTable变量中获取对应的内存块

12.            final MemoryBlock page =  pageTable[pageNumber];

13.            assert (page != null);

14.            assert (page.getBaseObject() != null);

15.      // 获取内存块对应的 BaseObject

16.            return page.getBaseObject();

17.          } else {

18.            // off-heap内存模式下MemoryBlock只需要保存一个绝对地址,因此对应

19.            // base object null

20.            return null;

21.          }

22.        }

下面开始解析PageTable的组织与管理方面的内容,在解析之前先给出以Page Table方式进行组织与管理的大致描述图,如图3-8所示:


3-8 PageTable组织与管理描述图

在图3-8中,右侧是分配的内存块,即当前需要管理的Page,在TaskMemoryManager中,通过Page Table来存放内存块,同时,通过变量allocatedPages中指定值为Page Number(页码)的下标(索引)对应的值是否为1来表示当前Page Number对应的Page Table中的Page是否已经存放了对应的内存块,即每当分配到一个内存块时,从allocatedPages获取一个值为0的位置(页码),并将该位置作为内存块放入到Page Table中的位置。

简单点描述的话,就是allocatedPages中各个位置上的值为10来表示在PageTable中相同位置是否已经放置了内存块(Page)。

而对应在PageTable中已经存放的内存块,实际上就是对应了右侧已经分配的内存块。

当针对一个PageEncode(页地址编码)时,首先从中获取Page Number,根据该值从Page Table中获取确定的内存块(MemoryBlockPage),找到确定内存块之后,再通过页地址编码中的offset(具体两种内存模式下的概念如图3-8所示)确定的内存块中的相关偏移量,如果是off-heap,则该offset是相对于内存块(从前面分析可知,内存块本身的信息也与内存模式相关)中的绝对地址的相对地址,如果是on-heap,则该offset是相对于内存块的base object中的偏移量。

相关的源码主要涉及TaskMemoryManager类的两个成员变量,如下所示:

1.         // 对应图3-8Page Table

2.         private final MemoryBlock[]  pageTable = new MemoryBlock[PAGE_TABLE_SIZE];

3.          

4.         // 对应图3-8allocatedPages

5.         private final BitSet  allocatedPages = new BitSet(PAGE_TABLE_SIZE);

PageTable的组织与管理中关于页码的偏移量已经在上一部分给出详细描述,而对应的具体的管理操作则与实际的内存分配与解析部分相关,通过图3-8对大致的管理有一定概念后,再继续通过内存分配与解析部分,来详细解析具体的管理细节。

下面开始分析TaskMemoryManager类提供的内存分配与解析部分。关于这部分内容,主要参考allocatePagefreePage两个方法,对应allocatePage内部如何申请内存,以及申请内存时采用的spill策略等细节,大家可以继续深入,比如查看acquireExecutionMemory的具体源码来加深理解。

allocatePage方法的源码及其解析如下所示:

1.           /**

2.            * Allocate a block of memory that will be  tracked in the MemoryManager's page table; this

3.            * is intended for allocating large blocks  of Tungsten memory that will be shared between

4.            * operators.

5.            * Returns `null` if there was not enough  memory to allocate the page. May return a page

6.            * that contains fewer bytes than  requested, so callers should verify the size of returned

7.            * pages.

8.            * 分配一块内存,并通过 MemoryManager(实际上是在 TaskMemoryManager 中)的

9.            * Page Table 进行跟踪;分配的是 Execution 部分的内存。

10.         * Project Tungsten 的内存包含 off-heap on-heap 两种模式,由底层

11.         * tungstenMemoryMode(在MemoryManager中设置)控制具体分配的

12.      *MemoryAllocator 子类。

13.         */

14.        public MemoryBlock allocatePage(long size,  MemoryConsumer consumer) {

15.          // 页大小的限制

16.          if (size > MAXIMUM_PAGE_SIZE_BYTES) {

17.            throw new IllegalArgumentException(

18.              "Cannot allocate a page with  more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");

19.          }

20.       

21.          // 特定的内存消费者 consumer 以指定的内存模式 tungstenMemoryMode

22.          // 申请一定的内存量

23.          long acquired =  acquireExecutionMemory(size, tungstenMemoryMode, consumer);

24.          if (acquired <= 0) {

25.            return null;

26.          }

27.       

28.          final int pageNumber;

29.          synchronized (this) {

30.            // 获取当前未被占用的页码

31.            pageNumber =  allocatedPages.nextClearBit(0);

32.            if (pageNumber >= PAGE_TABLE_SIZE) {

33.              releaseExecutionMemory(acquired,  tungstenMemoryMode, consumer);

34.              throw new IllegalStateException(

35.                "Have already allocated a  maximum of " + PAGE_TABLE_SIZE + " pages");

36.            }

37.            // 设置该页码已经被占用(即设置对应页码位置的值)

38.            allocatedPages.set(pageNumber);

39.          }

40.       

41.          // 开始通过 MemoryAllocator 真正分配内存

42.          // 注意 : acquireExecutionMemory 中通过 ExecutionMemoryPool 进行分配时,

43.          // 仅仅是内存使用大小上的控制,并没有真正分配内存。

44.          // 有兴趣的话,可以查看对 acquireExecutionMemory 的调用点(其中

45.          // 可以指定与tungstenMemoryMode不同的其他内存模式,

46.          // 此时是不存在真正的内存分配的)

47.          final MemoryBlock page =  memoryManager.tungstenMemoryAllocator().allocate(acquired);

48.       

49.          // 分配得到内存块之后,会设置该内存块对应的 pageNumber

50.          // 即此时设置 MemoryBlock 在其管理的 Page Table 中的位置。

51.          page.pageNumber = pageNumber;

52.          pageTable[pageNumber] = page;

53.          if (logger.isTraceEnabled()) {

54.            logger.trace("Allocate page number  {} ({} bytes)", pageNumber, acquired);

55.          }

56.          return page;

57.        }

其中,MAXIMUM_PAGE_SIZE_BYTES是页内数据量大小的现在,从之前MemoryBlock提供的从long型数组转换得到MemoryBlock接口,可以知道当前连续的内存块是通过long型数组来获取的,因此对应的内存块的大小也会受到数组的最大长度的现在。

至于对应在具体的处理过程中,对页内的数据量大小是否还有其他现在,可以参考具体的处理细节,下一章节会给出一个具体处理过程的源码解析,其中会包含这部分内容。

由于分配的细节比较多,这里给出主要的过程描述:

  • 首先通过acquireExecutionMemory方法,向ExecutionMemoryPool申请内存(根据统一或静态两种具体实现给出):这一部分主要是判断当前可用内存是否满足申请需求,并根据申请结果修改当前内存池可用内存信息(实际是当前使用内存量信息)。

  • 从当前Page Table中找出一个可用位置,用于存放所申请的内存块(MemoryBlockPage)。

  • 准备好前两步后,开始通过MemoryAllocator真正分区内存块。

  • 将分配的内存块放入Page Table

在整个过程中,allocatedPagespageTable这两个成员变量的使用是体现PageTable组织与管理的关键所在。

下面解析freePage的源码,如下所示:

1.           /**

2.            * Free a block of memory allocated via  {@link TaskMemoryManager#allocatePage}.

3.            * 更新 Page Table 相关信息,通过 MemoryAllocator 释放 Page 的内存,

4.            * 最后通过 MemoryManager 修改 ExecutorManagerPool 中内存使用量(即释放)。

5.            */

6.           public void freePage(MemoryBlock page,  MemoryConsumer consumer) {

7.             // 首先确认当前释放的内存块是在 Page Table 的管理中的,即页码必须有效

8.             assert (page.pageNumber != -1) :

9.               "Called freePage() on memory that  wasn't allocated with allocatePage()";

10.          assert(allocatedPages.get(page.pageNumber));

11.          pageTable[page.pageNumber] = null;

12.       

13.          // allocatedPages 是控制 pageTable 中对应位置是否可用的,

14.          // 需要考虑释放与分配时的并发性,因此需同步处理

15.          synchronized (this) {

16.            allocatedPages.clear(page.pageNumber);

17.          }

18.          if (logger.isTraceEnabled()) {

19.            logger.trace("Freed page number {}  ({} bytes)", page.pageNumber, page.size());

20.          }

21.       

22.          // 通过当前内存模式对应的 MemoryAllocator 真正释放该内存块

23.          long pageSize = page.size();

24.          memoryManager.tungstenMemoryAllocator().free(page);

25.       

26.          // 对应 ExecutionMemoryPool 部分的内存释放,

27.          // 参考前面 acquireExecutionMemory 解析一起了解

28.          releaseExecutionMemory(pageSize,  tungstenMemoryMode, consumer);

29.        }

释放Page的逻辑实际上可以参考申请Page,大部分都是步骤相反而已。

 


文章转载自偷功,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论