序
本文主要研究一下flink TaskManager的managed memory
TaskManagerOptions
flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolvingpublic class TaskManagerOptions { //...... /** * JVM heap size for the TaskManagers with memory size. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY) public static final ConfigOptionTASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" + " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" + " YARN container, minus a certain tolerance value."); /** * Amount of memory to be allocated by the task manager's memory manager. If not * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. */ public static final ConfigOption MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") .defaultValue("0") .withDescription("Amount of memory to be allocated by the task manager's memory manager." + " If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is * not set. */ public static final ConfigOption MANAGED_MEMORY_FRACTION = key("taskmanager.memory.fraction") .defaultValue(0.7f) .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" + " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." + " For example, a value of `0.8` means that a task manager reserves 80% of its memory" + " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager * as well as the network buffers. **/ public static final ConfigOption MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") .defaultValue(false) .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") .defaultValue(false) .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); //......}
- taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(
主要用于sorting,hashing及caching
),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory
TaskManagerServices.calculateHeapSizeMB
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices { //...... /** * Calculates the amount of heap memory to use (to set via -Xmx and -Xms) * based on the total memory to use and the given configuration parameters. * * @param totalJavaMemorySizeMB * overall available memory to use (heap and off-heap) * @param config * configuration object * * @return heap memory to use (in megabytes) */ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) { Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) final long networkBufMB = calculateNetworkBufferMemory( totalJavaMemorySizeMB << 20, // megabytes to bytes config) >> 20; // bytes to megabytes final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB; // split the available Java memory between heap and off-heap final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP); final long heapSizeMB; if (useOffHeap) { long offHeapSize; String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException( "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); } } else { offHeapSize = Long.valueOf(managedMemorySizeDefaultVal); } if (offHeapSize <= 0) { // calculate off-heap section via fraction double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); offHeapSize = (long) (fraction * remainingJavaMemorySizeMB); } TaskManagerServicesConfiguration .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "Managed memory size too large for " + networkBufMB + " MB network buffer memory and a total of " + totalJavaMemorySizeMB + " MB JVM memory"); heapSizeMB = remainingJavaMemorySizeMB - offHeapSize; } else { heapSizeMB = remainingJavaMemorySizeMB; } return heapSizeMB; } //......}
- taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
- 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
- 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
TaskManagerServices.createMemoryManager
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices { //...... /** * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}. * * @param taskManagerServicesConfiguration to create the memory manager from * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory * @param maxJvmHeapMemory the maximum JVM heap size * @return Memory manager * @throws Exception */ private static MemoryManager createMemoryManager( TaskManagerServicesConfiguration taskManagerServicesConfiguration, long freeHeapMemoryWithDefrag, long maxJvmHeapMemory) throws Exception { // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized // check if a value has been configured long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory(); MemoryType memType = taskManagerServicesConfiguration.getMemoryType(); final long memorySize; boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory(); if (configuredMemory > 0) { if (preAllocateMemory) { LOG.info("Using {} MB for managed memory." , configuredMemory); } else { LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); } memorySize = configuredMemory << 20; // megabytes to bytes } else { // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction(); if (memType == MemoryType.HEAP) { // network buffers allocated off-heap -> use memoryFraction of the available heap: long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); if (preAllocateMemory) { LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , memoryFraction , relativeMemSize >> 20); } else { LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20); } memorySize = relativeMemSize; } else if (memType == MemoryType.OFF_HEAP) { // The maximum heap memory has been adjusted according to the fraction (see // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e. // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction) // directMemorySize = jvmTotalNoNet * memoryFraction long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction); if (preAllocateMemory) { LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , memoryFraction, directMemorySize >> 20); } else { LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20); } memorySize = directMemorySize; } else { throw new RuntimeException("No supported memory type detected."); } } // now start the memory manager final MemoryManager memoryManager; try { memoryManager = new MemoryManager( memorySize, taskManagerServicesConfiguration.getNumberOfSlots(), taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(), memType, preAllocateMemory); } catch (OutOfMemoryError e) { if (memType == MemoryType.HEAP) { throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); } else if (memType == MemoryType.OFF_HEAP) { throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager off-heap memory (" + memorySize + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); } else { throw e; } } return memoryManager; } //......}
- TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
- 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
- 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
TaskManagerServicesConfiguration
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
public class TaskManagerServicesConfiguration { //...... /** * Utility method to extract TaskManager config parameters from the configuration and to * sanity check them. * * @param configuration The configuration. * @param remoteAddress identifying the IP address under which the TaskManager will be accessible * @param localCommunication True, to skip initializing the network stack. * Use only in cases where only one task manager runs. * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. */ public static TaskManagerServicesConfiguration fromConfiguration( Configuration configuration, InetAddress remoteAddress, boolean localCommunication) throws Exception { // we need this because many configs have been written with a "-1" entry int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (slots == -1) { slots = 1; } final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration); String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration); if (localStateRootDir.length == 0) { // default to temp dirs. localStateRootDir = tmpDirs; } boolean localRecoveryMode = configuration.getBoolean( CheckpointingOptions.LOCAL_RECOVERY.key(), CheckpointingOptions.LOCAL_RECOVERY.defaultValue()); final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( configuration, localCommunication, remoteAddress, slots); final QueryableStateConfiguration queryableStateConfig = parseQueryableStateConfiguration(configuration); // extract memory settings long configuredMemory; String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException( "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); } } else { configuredMemory = Long.valueOf(managedMemorySizeDefaultVal); } checkConfigParameter( configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) || configuredMemory > 0, configuredMemory, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "MemoryManager needs at least one MB of memory. " + "If you leave this config parameter empty, the system automatically " + "pick a fraction of the available memory."); // check whether we use heap or off-heap memory final MemoryType memType; if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { memType = MemoryType.OFF_HEAP; } else { memType = MemoryType.HEAP; } boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE); float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction, TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis(); return new TaskManagerServicesConfiguration( remoteAddress, tmpDirs, localStateRootDir, localRecoveryMode, networkConfig, queryableStateConfig, slots, configuredMemory, memType, preAllocateMemory, memoryFraction, timerServiceShutdownTimeout, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); } //......}
- TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP
小结
- TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(
主要用于sorting,hashing及caching
),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize - TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
- TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP