Spark内存管理
        Spark执行应用程序时,Spark集群会启动 Driver和Executor两种JVM进程,
                Driver: 负责创建SparkContext上下文,提交任务,task的分发等。
                Executor: 负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。
        Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。


Spark内存管理分为静态内存管理和统一内存管理,Spark1.6之前使用的是静态内存管理,Spark1.6之后引入了统一内存管理。

    静态内存管理 : 存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
    统一内存管理: 与静态内存管理的区别在于 储存内存和执行内存共享同一块空间,可以互相借用对方的空间

    Spark1.6以上版本默认使用的是统一内存管理,可以通过参数如下配置使用静态内存管理。
            spark.memory.useLegacyMode=true   默认为false

一、静态内存管理

静态内存管理类(StaticMemoryManager)源码

/**
 * A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
 * // 主要分为 运算 和 存储 区域
 * The sizes of the execution and storage regions are determined through
 * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively.
 * The two regions are cleanly separated such that neither usage can borrow memory from the other.
 */
private[spark] class StaticMemoryManager(

    首先总的内存分为3个部分

    (1)、存储内存区域: 

            占用60%内存

            由 spark.storage.memoryFraction 指定,默认值:0.6

    (2)、shuffle内存区域

            占用20%内存

            由 spark.shuffle.memoryFraction 指定,默认值0.2

    (3)、task运算内存区域

            占用 100% - 60% - 20% = 20% 内存

            不能指定,指定另外两个,剩下的即为此内存大小

        


存储内存区域 (60%)又分为两个区域

  /**
   * Return the total amount of memory available for the storage region, in bytes.
   */
  private def getMaxStorageMemory(conf: SparkConf): Long = {
    // Executor 最大可用内存大小
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    // 存储区域内存
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
    // 存储区域的 可安全 使用的内存比例
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
    // 相乘得 可安全 使用的内存大小
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

        1、(可安全使用的区域)存储RDD持久化数据以及广播变量    和   用于数据反序列化

                    占用此区域90%内存  (60% * 90%)

                    由 spark.storage.safetyFraction 指定,默认值0.9

                    其中又分为两个区域

                            · 用于数据反序列化

                                    占用此区域20%内存  (60% * 90% * 20%)

                                    由 spark.storage.unrollFraction 指定,默认值0.2

  // Max number of bytes worth of blocks to evict when unrolling
  private val maxUnrollMemory: Long = {
    (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
  }

                            · 存储RDD持久化数据以及广播变量

                                    占用此区域70%内存  (60% * 90% * 70%)

                                    不能指定,占用90%除去序列化使用的剩余内存。

        2、预留出来的区域,用于防止OOM以及给JVM本身使用

                    占用此区域10%内存 

                    不能指定,由 100% - 90% 得


shuffle内存区域 (20%)又分为两个区域

 /**
   * Return the total amount of memory available for the execution region, in bytes.
   */
  private def getMaxExecutionMemory(conf: SparkConf): Long = {
    // Executor 最大可用内存大小
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    // shuffle聚合 区域内存比例
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    // shuffle聚合 区域内存 可安全使用的比例
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    // 相乘得 可安全 使用的内存大小
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

        1、用于shuffle聚合的内存

                    占用 80% (20% * 20%)

                    由 spark.shuffle.safetyFraction 指定,默认值0.8

        2、预留出来用于防止OOM

                   占用此区域20%内存 

                   不能指定,由 100% - 80% 得



二、统一的内存管理

统一内存管理类(UnifiedMemoryManager )源码

/**
 * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
 * either side can borrow memory from the other.
 *
 * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
 * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
 * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
 * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
 *
 * Storage can borrow as much execution memory as is free until execution reclaims its space.
 * When this happens, cached blocks will be evicted from memory until sufficient borrowed
 * memory is released to satisfy the execution memory request.
 *
 * Similarly, execution can borrow as much storage memory as is free. However, execution
 * memory is *never* evicted by storage due to the complexities involved in implementing this.
 * The implication is that attempts to cache blocks may fail if execution has already eaten
 * up most of the storage space, in which case the new blocks will be evicted immediately
 * according to their respective storage levels.
 *
 * @param storageRegionSize Size of the storage region, in bytes.
 *                          This region is not statically reserved; execution can borrow from
 *                          it if necessary. Cached blocks can be evicted only if actual
 *                          storage memory usage exceeds this region.
 */
private[spark] class UnifiedMemoryManager private[memory] (

    分为 2 个部分

  /**
   * Return the total amount of memory shared between execution and storage, in bytes.
   */
  private def getMaxMemory(conf: SparkConf): Long = {
    // Executor 内存 大小
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    // spark.testing.reservedMemory
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = reservedMemory * 1.5
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please use a larger heap size.")
    }
    // 根据当前JVM的启动内存,减去300MB , Executor 可用内存 大小
    val usableMemory = systemMemory - reservedMemory
    // task用的 运算与存储 内存
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
    (usableMemory * memoryFraction).toLong
  }

       

    (1)、留给JVM的内存

                       默认值 300M

                       由 spark.testing.reservedMemory 指定

    (2)、 task运算与存储的内存

                        默认值: Excutor内存大小  -  300M

                        其中又分为 2 个区域

                                1、存储区域

                                            占用内存: 75%

                                            由 spark.memory.fraction 指定,默认值 0.75。

                                            其中分为 2 个区域

                                                      · 存储RDD持久化数据以及广播变量

                                                                     占用内存: 50%

                                                                     由 spark.memory.storageFraction 指定,默认值 0.5。

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxMemory = maxMemory,
      // 存储 内存
      storageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

                                                     ·   shuffle聚合内存

                                                                      占用内存: 50%

                                                                      不能指定,占用中存储区域剩下的。

                               2、 运算区域

                                            占用内存: 25%

                                            不能指定,100% - 25%。


添加新评论