hadoop FairSchedulerConfiguration 源码

  • 2022-10-20
  • 浏览 (238)

haddop FairSchedulerConfiguration 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import static org.apache.hadoop.yarn.util.resource.ResourceUtils.RESOURCE_REQUEST_VALUE_PATTERN;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Private
@Evolving
public class FairSchedulerConfiguration extends Configuration {

  public static final Logger LOG = LoggerFactory.getLogger(
      FairSchedulerConfiguration.class.getName());

  /**
   * Resource Increment request grant-able by the FairScheduler.
   * This property is looked up in the yarn-site.xml.
   * @deprecated The preferred way to configure the increment is by using the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property,
   * for memory: yarn.resource-types.memory-mb.increment-allocation
   */
  @Deprecated
  public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
    YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb";
  @Deprecated
  public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024;
  /**
   * Resource Increment request grant-able by the FairScheduler.
   * This property is looked up in the yarn-site.xml.
   * @deprecated The preferred way to configure the increment is by using the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property,
   * for CPU: yarn.resource-types.vcores.increment-allocation
   */
  @Deprecated
  public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
    YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
  @Deprecated
  public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;

  /** Threshold for container size for making a container reservation as a
   * multiple of increment allocation. Only container sizes above this are
   * allowed to reserve a node */
  public static final String
      RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE =
      YarnConfiguration.YARN_PREFIX +
          "scheduler.reservation-threshold.increment-multiple";
  public static final float
      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f;

  private static final String CONF_PREFIX =  "yarn.scheduler.fair.";

  /**
   * Used during FS->CS conversion. When enabled, background threads are
   * not started. This property should NOT be used by end-users!
   */
  public static final String MIGRATION_MODE = CONF_PREFIX + "migration.mode";

  /**
   * Disables checking whether a placement rule is terminal or not. Only
   * used during migration mode. This property should NOT be used by end users!
   */
  public static final String NO_TERMINAL_RULE_CHECK = CONF_PREFIX +
      "no-terminal-rule.check";

  public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
  protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
  
  /** Whether pools can be created that were not specified in the FS configuration file
   */
  public static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX +
      "allow-undeclared-pools";
  public static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
  
  /** Whether to use the user name as the queue name (instead of "default") if
   * the request does not specify a queue. */
  public static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX +
      "user-as-default-queue";
  public static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;

  protected static final float  DEFAULT_LOCALITY_THRESHOLD = -1.0f;

  /** Cluster threshold for node locality. */
  public static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX +
      "locality.threshold.node";
  public static final float  DEFAULT_LOCALITY_THRESHOLD_NODE =
		  DEFAULT_LOCALITY_THRESHOLD;

  /** Cluster threshold for rack locality. */
  public static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX +
      "locality.threshold.rack";
  public static final float  DEFAULT_LOCALITY_THRESHOLD_RACK =
		  DEFAULT_LOCALITY_THRESHOLD;

  /**
   * Delay for node locality.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX +
      "locality-delay-node-ms";
  @Deprecated
  protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;

  /**
   * Delay for rack locality.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX +
      "locality-delay-rack-ms";
  @Deprecated
  protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;

  /**
   * Enable continuous scheduling or not.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * the scheduler in larger cluster, more than 100 nodes, use
   * {@link #ASSIGN_MULTIPLE} to improve  container allocation ramp up.
   */
  @Deprecated
  public static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
      "continuous-scheduling-enabled";
  @Deprecated
  public static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;

  /**
   * Sleep time of each pass in continuous scheduling (5ms in default).
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  public static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
      "continuous-scheduling-sleep-ms";
  @Deprecated
  public static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;

  /** Whether preemption is enabled. */
  public static final String  PREEMPTION = CONF_PREFIX + "preemption";
  public static final boolean DEFAULT_PREEMPTION = false;

  protected static final String AM_PREEMPTION =
      CONF_PREFIX + "am.preemption";
  protected static final String AM_PREEMPTION_PREFIX =
          CONF_PREFIX + "am.preemption.";
  protected static final boolean DEFAULT_AM_PREEMPTION = true;

  protected static final String PREEMPTION_THRESHOLD =
      CONF_PREFIX + "preemption.cluster-utilization-threshold";
  protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;

  public static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX +
      "waitTimeBeforeKill";
  public static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;

  /**
   * Postfix for resource allocation increments in the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property.
   */
  static final String INCREMENT_ALLOCATION = ".increment-allocation";

  /**
   * Configurable delay (ms) before an app's starvation is considered after
   * it is identified. This is to give the scheduler enough time to
   * allocate containers post preemption. This delay is added to the
   * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
   *
   * This is intended to be a backdoor on production clusters, and hence
   * intentionally not documented.
   */
  public static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
      CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
  public static final long
      DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;

  /** Whether to assign multiple containers in one check-in. */
  public static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
  public static final boolean DEFAULT_ASSIGN_MULTIPLE = false;

  /** Whether to give more weight to apps requiring many resources. */
  public static final String  SIZE_BASED_WEIGHT = CONF_PREFIX +
      "sizebasedweight";
  public static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;

  /** Maximum number of containers to assign on each check-in. */
  public static final String DYNAMIC_MAX_ASSIGN =
      CONF_PREFIX + "dynamic.max.assign";
  private static final boolean DEFAULT_DYNAMIC_MAX_ASSIGN = true;

  /**
   * Specify exact number of containers to assign on each heartbeat, if dynamic
   * max assign is turned off.
   */
  public static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
  public static final int DEFAULT_MAX_ASSIGN = -1;

  /** The update interval for calculating resources in FairScheduler .*/
  public static final String UPDATE_INTERVAL_MS =
      CONF_PREFIX + "update-interval-ms";
  public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;

  /** Ratio of nodes available for an app to make an reservation on. */
  public static final String RESERVABLE_NODES =
          CONF_PREFIX + "reservable-nodes";
  public static final float RESERVABLE_NODES_DEFAULT = 0.05f;

  private static final String INVALID_RESOURCE_DEFINITION_PREFIX =
          "Error reading resource config--invalid resource definition: ";
  private static final String RESOURCE_PERCENTAGE_PATTERN =
      "^(-?(\\d+)(\\.\\d*)?)\\s*%\\s*";
  private static final String RESOURCE_VALUE_PATTERN =
      "^(-?\\d+)(\\.\\d*)?\\s*";
  /**
   * For resources separated by spaces instead of a comma.
   */
  private static final String RESOURCES_WITH_SPACES_PATTERN =
      "-?\\d+(?:\\.\\d*)?\\s*[a-z]+\\s*";

  public FairSchedulerConfiguration() {
    super();
  }
  
  public FairSchedulerConfiguration(Configuration conf) {
    super(conf);
  }

  public Resource getMinimumAllocation() {
    int mem = getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    int cpu = getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
    return Resources.createResource(mem, cpu);
  }

  public Resource getMaximumAllocation() {
    int mem = getInt(
        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    int cpu = getInt(
        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    return Resources.createResource(mem, cpu);
  }

  public Resource getIncrementAllocation() {
    Long memory = null;
    Integer vCores = null;
    Map<String, Long> others = new HashMap<>();
    ResourceInformation[] resourceTypes = ResourceUtils.getResourceTypesArray();
    for (int i=0; i < resourceTypes.length; ++i) {
      String name = resourceTypes[i].getName();
      String propertyKey = getAllocationIncrementPropKey(name);
      String propValue = get(propertyKey);
      if (propValue != null) {
        Matcher matcher = RESOURCE_REQUEST_VALUE_PATTERN.matcher(propValue);
        if (matcher.matches()) {
          long value = Long.parseLong(matcher.group(1));
          String unit = matcher.group(2);
          long valueInDefaultUnits = getValueInDefaultUnits(value, unit, name);
          others.put(name, valueInDefaultUnits);
        } else {
          throw new IllegalArgumentException("Property " + propertyKey +
              " is not in \"value [unit]\" format: " + propValue);
        }
      }
    }
    if (others.containsKey(ResourceInformation.MEMORY_MB.getName())) {
      memory = others.get(ResourceInformation.MEMORY_MB.getName());
      if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) != null) {
        String overridingKey = getAllocationIncrementPropKey(
                ResourceInformation.MEMORY_MB.getName());
        LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
            " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_MB +
            "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) + " property");
      }
      others.remove(ResourceInformation.MEMORY_MB.getName());
    } else {
      memory = getLong(
          RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
          DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
    }
    if (others.containsKey(ResourceInformation.VCORES.getName())) {
      vCores = others.get(ResourceInformation.VCORES.getName()).intValue();
      if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) != null) {
        String overridingKey = getAllocationIncrementPropKey(
            ResourceInformation.VCORES.getName());
        LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
            " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES +
            "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) + " property");
      }
      others.remove(ResourceInformation.VCORES.getName());
    } else {
      vCores = getInt(
          RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
          DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
    }
    return Resource.newInstance(memory, vCores, others);
  }

  private long getValueInDefaultUnits(long value, String unit,
      String resourceName) {
    return unit.isEmpty() ? value : UnitsConversionUtil.convert(unit,
        ResourceUtils.getDefaultUnit(resourceName), value);
  }

  private String getAllocationIncrementPropKey(String resourceName) {
    return YarnConfiguration.RESOURCE_TYPES + "." + resourceName +
        INCREMENT_ALLOCATION;
  }

  public float getReservationThresholdIncrementMultiple() {
    return getFloat(
      RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE,
      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE);
  }

  public float getLocalityThresholdNode() {
    return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
  }

  public float getLocalityThresholdRack() {
    return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
  }

  /**
   * Whether continuous scheduling is turned on.
   * @deprecated use {@link #ASSIGN_MULTIPLE} to improve container allocation
   * ramp up.
   * @return whether continuous scheduling is enabled
   */
  @Deprecated
  public boolean isContinuousSchedulingEnabled() {
    return getBoolean(CONTINUOUS_SCHEDULING_ENABLED,
        DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
  }

  /**
   * The sleep time of the continuous scheduler thread.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return sleep time in ms
   */
  @Deprecated
  public int getContinuousSchedulingSleepMs() {
    return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS,
        DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
  }

  /**
   * Delay in milliseconds for locality fallback node to rack.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return delay in ms
   */
  @Deprecated
  public long getLocalityDelayNodeMs() {
    return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
  }

  /**
   * Delay in milliseconds for locality fallback rack to other.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return delay in ms
   */
  @Deprecated
  public long getLocalityDelayRackMs() {
    return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
  }

  public boolean getPreemptionEnabled() {
    return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
  }

  public boolean getAMPreemptionEnabled(String queueName) {
    String propertyName = AM_PREEMPTION_PREFIX + queueName;

    if (get(propertyName) != null) {
      boolean amPreemptionEnabled =
          getBoolean(propertyName, DEFAULT_AM_PREEMPTION);
      LOG.debug("AM preemption enabled for queue {}: {}",
          queueName, amPreemptionEnabled);
      return amPreemptionEnabled;
    }

    return getBoolean(AM_PREEMPTION, DEFAULT_AM_PREEMPTION);
  }

  public float getPreemptionUtilizationThreshold() {
    return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
  }

  public boolean getAssignMultiple() {
    return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
  }

  public boolean isMaxAssignDynamic() {
    return getBoolean(DYNAMIC_MAX_ASSIGN, DEFAULT_DYNAMIC_MAX_ASSIGN);
  }

  public int getMaxAssign() {
    return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
  }

  public boolean getSizeBasedWeight() {
    return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
  }

  public long getWaitTimeBeforeNextStarvationCheck() {
    return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
        DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
  }
  
  public int getWaitTimeBeforeKill() {
    return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
  }

  public boolean getUsePortForNodeName() {
    return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
  }

  public float getReservableNodes() {
    return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT);
  }

  /**
   * Parses a resource config value in one of three forms:
   * <ol>
   * <li>Percentage: &quot;50%&quot; or &quot;40% memory, 60% cpu&quot;</li>
   * <li>New style resources: &quot;vcores=10, memory-mb=1024&quot;
   * or &quot;vcores=60%, memory-mb=40%&quot;</li>
   * <li>Old style resources: &quot;1024 mb, 10 vcores&quot;</li>
   * </ol>
   * In new style resources, any resource that is not specified will be
   * set to {@link Long#MAX_VALUE} or 100%, as appropriate. Also, in the new
   * style resources, units are not allowed. Units are assumed from the resource
   * manager's settings for the resources when the value isn't a percentage.
   *
   * @param value the resource definition to parse
   * @return a {@link ConfigurableResource} that represents the parsed value
   * @throws AllocationConfigurationException if the raw value is not a valid
   * resource definition
   */
  public static ConfigurableResource parseResourceConfigValue(String value)
      throws AllocationConfigurationException {
    return parseResourceConfigValue(value, Long.MAX_VALUE);
  }

  /**
   * Parses a resource config value in one of three forms:
   * <ol>
   * <li>Percentage: &quot;50%&quot; or &quot;40% memory, 60% cpu&quot;</li>
   * <li>New style resources: &quot;vcores=10, memory-mb=1024&quot;
   * or &quot;vcores=60%, memory-mb=40%&quot;</li>
   * <li>Old style resources: &quot;1024 mb, 10 vcores&quot;</li>
   * </ol>
   * In new style resources, any resource that is not specified will be
   * set to {@code missing} or 0%, as appropriate. Also, in the new style
   * resources, units are not allowed. Units are assumed from the resource
   * manager's settings for the resources when the value isn't a percentage.
   *
   * The {@code missing} parameter is only used in the case of new style
   * resources without percentages. With new style resources with percentages,
   * any missing resources will be assumed to be 100% because percentages are
   * only used with maximum resource limits.
   *
   * @param value the resource definition to parse
   * @param missing the value to use for any unspecified resources
   * @return a {@link ConfigurableResource} that represents the parsed value
   * @throws AllocationConfigurationException if the raw value is not a valid
   * resource definition
   */
  public static ConfigurableResource parseResourceConfigValue(String value,
      long missing) throws AllocationConfigurationException {
    ConfigurableResource configurableResource;

    if (value.trim().isEmpty()) {
      throw new AllocationConfigurationException("Error reading resource "
          + "config--the resource string is empty.");
    }

    try {
      if (value.contains("=")) {
        configurableResource = parseNewStyleResource(value, missing);
      } else if (value.contains("%")) {
        configurableResource = parseOldStyleResourceAsPercentage(value);
      } else {
        configurableResource = parseOldStyleResource(value);
      }
    } catch (RuntimeException ex) {
      throw new AllocationConfigurationException(
          "Error reading resource config", ex);
    }

    return configurableResource;
  }

  private static ConfigurableResource parseNewStyleResource(String value,
          long missing) throws AllocationConfigurationException {

    final ConfigurableResource configurableResource;
    boolean asPercent = value.contains("%");
    if (asPercent) {
      configurableResource = new ConfigurableResource();
    } else {
      configurableResource = new ConfigurableResource(missing);
    }

    String[] resources = value.split(",");
    for (String resource : resources) {
      String[] parts = resource.split("=");

      if (parts.length != 2) {
        throw createConfigException(value,
                        "Every resource must be of the form: name=value.");
      }

      String resourceName = parts[0].trim();
      String resourceValue = parts[1].trim();
      try {
        if (asPercent) {
          double percentage = parseNewStyleResourceAsPercentage(value,
              resourceName, resourceValue);
          configurableResource.setPercentage(resourceName, percentage);
        } else {
          long parsedValue = parseNewStyleResourceAsAbsoluteValue(value,
              resourceValue, resourceName);
          configurableResource.setValue(resourceName, parsedValue);
        }
      } catch (ResourceNotFoundException ex) {
        throw createConfigException(value, "The "
            + "resource name, \"" + resourceName + "\" was not "
            + "recognized. Please check the value of "
            + YarnConfiguration.RESOURCE_TYPES + " in the Resource "
            + "Manager's configuration files.", ex);
      }
    }
    return configurableResource;
  }

  private static double parseNewStyleResourceAsPercentage(
      String value, String resource, String resourceValue)
      throws AllocationConfigurationException {
    try {
      return findPercentage(resourceValue, resource);
    } catch (AllocationConfigurationException ex) {
      throw createConfigException(value,
          "The resource values must all be percentages. \""
              + resourceValue + "\" is either not a non-negative number " +
              "or does not include the '%' symbol.", ex);
    }
  }

  private static long parseNewStyleResourceAsAbsoluteValue(String value,
      String resourceValue, String resourceName)
      throws AllocationConfigurationException {
    final long parsedValue;
    try {
      parsedValue = Long.parseLong(resourceValue);
    } catch (NumberFormatException e) {
      throw createConfigException(value, "The "
          + "resource values must all be integers. \"" + resourceValue
          + "\" is not an integer.", e);
    }
    if (parsedValue < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of " + resourceName +
              ": " + parsedValue + ", value should not be negative!");
    }
    return parsedValue;
  }

  private static ConfigurableResource parseOldStyleResourceAsPercentage(
          String value) throws AllocationConfigurationException {
    return new ConfigurableResource(
            getResourcePercentage(StringUtils.toLowerCase(value)));
  }

  private static ConfigurableResource parseOldStyleResource(String input)
          throws AllocationConfigurationException {
    final String lowerCaseInput = StringUtils.toLowerCase(input);
    String[] resources = lowerCaseInput.split(",");

    if (resources.length != 2) {
      resources = findOldStyleResourcesInSpaceSeparatedInput(lowerCaseInput);
      if (resources.length != 2) {
        throw new AllocationConfigurationException(
            "Cannot parse resource values from input: " + input);
      }
    }
    final int memory = parseOldStyleResourceMemory(resources);
    final int vcores = parseOldStyleResourceVcores(resources);
    return new ConfigurableResource(
            BuilderUtils.newResource(memory, vcores));
  }

  private static String[] findOldStyleResourcesInSpaceSeparatedInput(
      String input) {
    final Pattern pattern = Pattern.compile(RESOURCES_WITH_SPACES_PATTERN);
    final Matcher matcher = pattern.matcher(input);

    List<String> resources = Lists.newArrayList();
    while (matcher.find()) {
      resources.add(matcher.group(0));
    }
    return resources.toArray(new String[0]);
  }

  private static int parseOldStyleResourceMemory(String[] resources)
      throws AllocationConfigurationException {
    final int memory = findResource(resources, "mb");

    if (memory < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of memory: " + memory +
              ", value should not be negative!");
    }
    return memory;
  }

  private static int parseOldStyleResourceVcores(String[] resources)
      throws AllocationConfigurationException {
    final int vcores = findResource(resources, "vcores");

    if (vcores < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of vcores: " + vcores +
              ", value should not be negative!");
    }
    return vcores;
  }

  private static double[] getResourcePercentage(String val)
      throws AllocationConfigurationException {
    int numberOfKnownResourceTypes = ResourceUtils
        .getNumberOfCountableResourceTypes();
    double[] resourcePercentage = new double[numberOfKnownResourceTypes];
    String[] values = val.split(",");

    if (values.length == 1) {
      double percentage = findPercentage(values, "");
      for (int i = 0; i < numberOfKnownResourceTypes; i++) {
        resourcePercentage[i] = percentage;
      }
    } else {
      resourcePercentage[0] = findPercentage(values, "memory");
      resourcePercentage[1] = findPercentage(values, "cpu");
    }

    return resourcePercentage;
  }

  private static double findPercentage(String resourceValue, String resource)
      throws AllocationConfigurationException {
    return findPercentageInternal(resource, resourceValue, false);
  }

  private static double findPercentage(String[] resourceValues, String resource)
      throws AllocationConfigurationException {
    String resourceValue = findResourceFromValues(resourceValues, resource);
    return findPercentageInternal(resource, resourceValue, true);
  }

  private static double findPercentageInternal(String resource,
      String resourceValue, boolean includeResourceInPattern)
      throws AllocationConfigurationException {
    final Pattern pattern;
    if (includeResourceInPattern) {
      pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN + resource);
    } else {
      pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN);
    }

    Matcher matcher = pattern.matcher(resourceValue);
    if (!matcher.matches()) {
      if (resource.equals("")) {
        throw new AllocationConfigurationException("Invalid percentage: " +
            resourceValue);
      } else {
        throw new AllocationConfigurationException("Invalid percentage of " +
            resource + ": " + resourceValue);
      }
    }
    double percentage = Double.parseDouble(matcher.group(1)) / 100.0;

    if (percentage < 0) {
      throw new AllocationConfigurationException("Invalid percentage: " +
          resourceValue + ", percentage should not be negative!");
    }

    return percentage;
  }

  private static AllocationConfigurationException createConfigException(
          String value, String message) {
    return createConfigException(value, message, null);
  }

  private static AllocationConfigurationException createConfigException(
      String value, String message, Throwable t) {
    String msg = INVALID_RESOURCE_DEFINITION_PREFIX + value + ". " + message;
    if (t != null) {
      return new AllocationConfigurationException(msg, t);
    } else {
      return new AllocationConfigurationException(msg);
    }
  }

  public long getUpdateInterval() {
    return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
  }
  
  private static int findResource(String[] resourceValues, String resource)
      throws AllocationConfigurationException {
    String resourceValue = findResourceFromValues(resourceValues, resource);
    final Pattern pattern = Pattern.compile(RESOURCE_VALUE_PATTERN +
        resource);
    Matcher matcher = pattern.matcher(resourceValue);
    if (!matcher.find()) {
      throw new AllocationConfigurationException("Invalid value of " +
          (resource.equals("mb") ? "memory" : resource) + ": " + resourceValue);
    }
    return Integer.parseInt(matcher.group(1));
  }

  private static String findResourceFromValues(String[] resourceValues,
      String resource) throws AllocationConfigurationException {
    for (String resourceValue : resourceValues) {
      if (resourceValue.contains(resource)) {
        return resourceValue.trim();
      }
    }
    throw new AllocationConfigurationException("Missing resource: " + resource);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocationConfiguration 源码

hadoop AllocationConfigurationException 源码

hadoop AllocationFileLoaderService 源码

hadoop ConfigurableResource 源码

hadoop FSAppAttempt 源码

hadoop FSContext 源码

hadoop FSLeafQueue 源码

hadoop FSOpDurations 源码

hadoop FSParentQueue 源码

hadoop FSPreemptionThread 源码

0  赞