博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink TaskManager的managed memory
阅读量:6669 次
发布时间:2019-06-25

本文共 18088 字,大约阅读时间需要 60 分钟。

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {    //......    /**     * JVM heap size for the TaskManagers with memory size.     */    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)    public static final ConfigOption
TASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" + " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" + " YARN container, minus a certain tolerance value."); /** * Amount of memory to be allocated by the task manager's memory manager. If not * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. */ public static final ConfigOption
MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") .defaultValue("0") .withDescription("Amount of memory to be allocated by the task manager's memory manager." + " If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is * not set. */ public static final ConfigOption
MANAGED_MEMORY_FRACTION = key("taskmanager.memory.fraction") .defaultValue(0.7f) .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" + " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." + " For example, a value of `0.8` means that a task manager reserves 80% of its memory" + " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager * as well as the network buffers. **/ public static final ConfigOption
MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") .defaultValue(false) .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption
MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") .defaultValue(false) .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); //......}
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {    //......    /**     * Calculates the amount of heap memory to use (to set via -Xmx and -Xms)     * based on the total memory to use and the given configuration parameters.     *     * @param totalJavaMemorySizeMB     *         overall available memory to use (heap and off-heap)     * @param config     *         configuration object     *     * @return heap memory to use (in megabytes)     */    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);        // subtract the Java memory used for network buffers (always off-heap)        final long networkBufMB =            calculateNetworkBufferMemory(                totalJavaMemorySizeMB << 20, // megabytes to bytes                config) >> 20; // bytes to megabytes        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;        // split the available Java memory between heap and off-heap        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);        final long heapSizeMB;        if (useOffHeap) {            long offHeapSize;            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {                try {                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();                } catch (IllegalArgumentException e) {                    throw new IllegalConfigurationException(                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);                }            } else {                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);            }            if (offHeapSize <= 0) {                // calculate off-heap section via fraction                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);            }            TaskManagerServicesConfiguration                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),                    "Managed memory size too large for " + networkBufMB +                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +                        " MB JVM memory");            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;        } else {            heapSizeMB = remainingJavaMemorySizeMB;        }        return heapSizeMB;    }    //......}
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {    //......    /**     * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.     *     * @param taskManagerServicesConfiguration to create the memory manager from     * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory     * @param maxJvmHeapMemory the maximum JVM heap size     * @return Memory manager     * @throws Exception     */    private static MemoryManager createMemoryManager(            TaskManagerServicesConfiguration taskManagerServicesConfiguration,            long freeHeapMemoryWithDefrag,            long maxJvmHeapMemory) throws Exception {        // computing the amount of memory to use depends on how much memory is available        // it strictly needs to happen AFTER the network stack has been initialized        // check if a value has been configured        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();        final long memorySize;        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();        if (configuredMemory > 0) {            if (preAllocateMemory) {                LOG.info("Using {} MB for managed memory." , configuredMemory);            } else {                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);            }            memorySize = configuredMemory << 20; // megabytes to bytes        } else {            // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();            if (memType == MemoryType.HEAP) {                // network buffers allocated off-heap -> use memoryFraction of the available heap:                long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);                if (preAllocateMemory) {                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,                        memoryFraction , relativeMemSize >> 20);                } else {                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +                        "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);                }                memorySize = relativeMemSize;            } else if (memType == MemoryType.OFF_HEAP) {                // The maximum heap memory has been adjusted according to the fraction (see                // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)                // directMemorySize = jvmTotalNoNet * memoryFraction                long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);                if (preAllocateMemory) {                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,                        memoryFraction, directMemorySize >> 20);                } else {                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +                        " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);                }                memorySize = directMemorySize;            } else {                throw new RuntimeException("No supported memory type detected.");            }        }        // now start the memory manager        final MemoryManager memoryManager;        try {            memoryManager = new MemoryManager(                memorySize,                taskManagerServicesConfiguration.getNumberOfSlots(),                taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),                memType,                preAllocateMemory);        } catch (OutOfMemoryError e) {            if (memType == MemoryType.HEAP) {                throw new Exception("OutOfMemory error (" + e.getMessage() +                    ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);            } else if (memType == MemoryType.OFF_HEAP) {                throw new Exception("OutOfMemory error (" + e.getMessage() +                    ") while allocating the TaskManager off-heap memory (" + memorySize +                    " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);            } else {                throw e;            }        }        return memoryManager;    }    //......}
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {    //......    /**     * Utility method to extract TaskManager config parameters from the configuration and to     * sanity check them.     *     * @param configuration The configuration.     * @param remoteAddress identifying the IP address under which the TaskManager will be accessible     * @param localCommunication True, to skip initializing the network stack.     *                                      Use only in cases where only one task manager runs.     * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.     */    public static TaskManagerServicesConfiguration fromConfiguration(            Configuration configuration,            InetAddress remoteAddress,            boolean localCommunication) throws Exception {        // we need this because many configs have been written with a "-1" entry        int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);        if (slots == -1) {            slots = 1;        }        final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);        if (localStateRootDir.length == 0) {            // default to temp dirs.            localStateRootDir = tmpDirs;        }        boolean localRecoveryMode = configuration.getBoolean(            CheckpointingOptions.LOCAL_RECOVERY.key(),            CheckpointingOptions.LOCAL_RECOVERY.defaultValue());        final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(            configuration,            localCommunication,            remoteAddress,            slots);        final QueryableStateConfiguration queryableStateConfig =                parseQueryableStateConfiguration(configuration);        // extract memory settings        long configuredMemory;        String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();        if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {            try {                configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();            } catch (IllegalArgumentException e) {                throw new IllegalConfigurationException(                    "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);            }        } else {            configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);        }        checkConfigParameter(            configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||                configuredMemory > 0, configuredMemory,            TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),            "MemoryManager needs at least one MB of memory. " +                "If you leave this config parameter empty, the system automatically " +                "pick a fraction of the available memory.");        // check whether we use heap or off-heap memory        final MemoryType memType;        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {            memType = MemoryType.OFF_HEAP;        } else {            memType = MemoryType.HEAP;        }        boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);        float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);        checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,            TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),            "MemoryManager fraction of the free memory must be between 0.0 and 1.0");        long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();        return new TaskManagerServicesConfiguration(            remoteAddress,            tmpDirs,            localStateRootDir,            localRecoveryMode,            networkConfig,            queryableStateConfig,            slots,            configuredMemory,            memType,            preAllocateMemory,            memoryFraction,            timerServiceShutdownTimeout,            ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));    }    //......}
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

转载地址:http://dplxo.baihongyu.com/

你可能感兴趣的文章
ESET NOD32 错误代码 MSI.5003
查看>>
ashx文件的使用
查看>>
Nginx实战之1.1-1.6 Nginx介绍,安装及配置文件详解
查看>>
Ansible之playbook的使用
查看>>
php-fpm比php成为apache模块好在哪
查看>>
如何使用PDF编辑工具给PDF文件添加背景
查看>>
【Python学习笔记】字典dict
查看>>
IOS越狱后必做的工作
查看>>
联想E430不能从u盘启动【解决办法】
查看>>
jquery提交表单jquery.form.js
查看>>
zookeeper分布式锁避免羊群效应(Herd Effect)
查看>>
ipset高大上性能果断将nf-HiPac逼下课
查看>>
自己办理积分入户深圳—经验之谈
查看>>
LR http 接口测试模板
查看>>
安家在 51CTO
查看>>
RHEL5下安装和配置LotusNotesClient8.5
查看>>
oracle 执行一条查询语句,把数据加载到页面或者前台发生的事情
查看>>
轮播图记录篇
查看>>
jQuery 前端实现手机验证码
查看>>
前尘浮华一场梦 NOI2018 游记
查看>>