hadoop FSQueueConverter 源码

  • 2022-10-20
  • 浏览 (219)

haddop FSQueueConverter 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSQueueConverter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverterFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Converts a Fair Schedule queue hierarchy to Capacity Scheduler
 * configuration.
 *
 */
public class FSQueueConverter {
  public static final float QUEUE_MAX_AM_SHARE_DISABLED = -1.0f;
  private static final int MAX_RUNNING_APPS_UNSET = Integer.MAX_VALUE;
  private static final String FAIR_POLICY = "fair";
  private static final String FIFO_POLICY = "fifo";

  private final FSConfigToCSConfigRuleHandler ruleHandler;
  private Configuration capacitySchedulerConfig;
  private final boolean preemptionEnabled;
  private final boolean sizeBasedWeight;
  @SuppressWarnings("unused")
  private final Resource clusterResource;
  private final float queueMaxAMShareDefault;
  private final int queueMaxAppsDefault;
  private final boolean drfUsed;
  private final boolean usePercentages;

  private ConversionOptions conversionOptions;

  public FSQueueConverter(FSQueueConverterBuilder builder) {
    this.ruleHandler = builder.ruleHandler;
    this.capacitySchedulerConfig = builder.capacitySchedulerConfig;
    this.preemptionEnabled = builder.preemptionEnabled;
    this.sizeBasedWeight = builder.sizeBasedWeight;
    this.clusterResource = builder.clusterResource;
    this.queueMaxAMShareDefault = builder.queueMaxAMShareDefault;
    this.queueMaxAppsDefault = builder.queueMaxAppsDefault;
    this.conversionOptions = builder.conversionOptions;
    this.drfUsed = builder.drfUsed;
    this.usePercentages = builder.usePercentages;
  }

  public void convertQueueHierarchy(FSQueue queue) {
    List<FSQueue> children = queue.getChildQueues();
    final String queueName = queue.getName();

    emitChildQueues(queueName, children);
    emitMaxAMShare(queueName, queue);
    emitMaxParallelApps(queueName, queue);
    emitMaxAllocations(queueName, queue);
    emitPreemptionDisabled(queueName, queue);

    emitChildCapacity(queue);
    emitMaximumCapacity(queueName, queue);
    emitSizeBasedWeight(queueName);
    emitOrderingPolicy(queueName, queue);
    checkMaxChildCapacitySetting(queue);

    for (FSQueue childQueue : children) {
      convertQueueHierarchy(childQueue);
    }
  }

  /**
   * Generates yarn.scheduler.capacity.&lt;queue-name&gt;.queues.
   * @param queueName
   * @param children
   */
  private void emitChildQueues(String queueName, List<FSQueue> children) {
    ruleHandler.handleChildQueueCount(queueName, children.size());

    if (children.size() > 0) {
      String childQueues = children.stream()
          .map(child -> getQueueShortName(child.getName()))
          .collect(Collectors.joining(","));

      capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues);
    }
  }

  /**
   * &lt;maxAMShare&gt; 
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-am-resource-percent.
   * @param queueName
   * @param queue
   */
  private void emitMaxAMShare(String queueName, FSQueue queue) {
    float queueMaxAmShare = queue.getMaxAMShare();

    // Direct floating point comparison is OK here
    if (queueMaxAmShare != 0.0f
        && queueMaxAmShare != queueMaxAMShareDefault
        && queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
      capacitySchedulerConfig.setFloat(PREFIX + queueName +
          ".maximum-am-resource-percent", queueMaxAmShare);
    }

    if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
        && queueMaxAmShare != queueMaxAMShareDefault) {
      capacitySchedulerConfig.setFloat(PREFIX + queueName +
          ".maximum-am-resource-percent", 1.0f);
    }
  }

  /**
   * &lt;maxRunningApps&gt;
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.max-parallel-apps.
   * @param queueName
   * @param queue
   */
  private void emitMaxParallelApps(String queueName, FSQueue queue) {
    if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
        && queue.getMaxRunningApps() != queueMaxAppsDefault) {
      capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
          String.valueOf(queue.getMaxRunningApps()));
    }
  }

  /**
   * &lt;maxResources&gt;
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-capacity.
   * @param queueName
   * @param queue
   */
  private void emitMaximumCapacity(String queueName, FSQueue queue) {
    ConfigurableResource rawMaxShare = queue.getRawMaxShare();
    final Resource maxResource = rawMaxShare.getResource();

    if ((maxResource == null && rawMaxShare.getPercentages() != null)
        || isNotUnboundedResource(maxResource)) {
      ruleHandler.handleMaxResources();
    }

    capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
        "100");
  }

  /**
   * &lt;maxContainerAllocation&gt;
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-allocation-mb
   * / vcores.
   * @param queueName
   * @param queue
   */
  private void emitMaxAllocations(String queueName, FSQueue queue) {
    Resource maxAllocation = queue.getMaximumContainerAllocation();

    if (isNotUnboundedResource(maxAllocation)) {
      long parentMaxVcores = Integer.MIN_VALUE;
      long parentMaxMemory = Integer.MIN_VALUE;

      if (queue.getParent() != null) {
        FSQueue parent = queue.getParent();
        Resource parentMaxAllocation = parent.getMaximumContainerAllocation();
        if (isNotUnboundedResource(parentMaxAllocation)) {
          parentMaxVcores = parentMaxAllocation.getVirtualCores();
          parentMaxMemory = parentMaxAllocation.getMemorySize();
        }
      }

      long maxVcores = maxAllocation.getVirtualCores();
      long maxMemory = maxAllocation.getMemorySize();

      // only emit max allocation if it differs from the parent's setting
      if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
        capacitySchedulerConfig.set(PREFIX + queueName +
            ".maximum-allocation-mb", String.valueOf(maxMemory));

        capacitySchedulerConfig.set(PREFIX + queueName +
            ".maximum-allocation-vcores", String.valueOf(maxVcores));
      }
    }
  }

  /**
   * &lt;allowPreemptionFrom&gt;
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.disable_preemption.
   * @param queueName
   * @param queue
   */
  private void emitPreemptionDisabled(String queueName, FSQueue queue) {
    if (preemptionEnabled && !queue.isPreemptable()) {
      capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption",
          "true");
    }
  }

  /**
   * yarn.scheduler.fair.sizebasedweight ==>
   * yarn.scheduler.capacity.&lt;queue-path&gt;
   * .ordering-policy.fair.enable-size-based-weight.
   * @param queueName
   */
  private void emitSizeBasedWeight(String queueName) {
    if (sizeBasedWeight) {
      capacitySchedulerConfig.setBoolean(PREFIX + queueName +
          ".ordering-policy.fair.enable-size-based-weight", true);
    }
  }

  /**
   * &lt;schedulingPolicy&gt;
   * ==> yarn.scheduler.capacity.&lt;queue-path&gt;.ordering-policy.
   * @param queueName
   * @param queue
   */
  private void emitOrderingPolicy(String queueName, FSQueue queue) {
    if (queue instanceof FSLeafQueue) {
      String policy = queue.getPolicy().getName();

      switch (policy) {
      case DominantResourceFairnessPolicy.NAME:
        capacitySchedulerConfig.set(PREFIX + queueName
            + ".ordering-policy", FAIR_POLICY);
        break;
      case FairSharePolicy.NAME:
        capacitySchedulerConfig.set(PREFIX + queueName
            + ".ordering-policy", FAIR_POLICY);
        if (drfUsed) {
          ruleHandler.handleFairAsDrf(queueName);
        }
        break;
      case FifoPolicy.NAME:
        capacitySchedulerConfig.set(PREFIX + queueName
            + ".ordering-policy", FIFO_POLICY);
        break;
      default:
        String msg = String.format("Unexpected ordering policy " +
            "on queue %s: %s", queue, policy);
        conversionOptions.handleConversionError(msg);
      }
    }
  }

  /**
   * weight + minResources
   * ==> yarn.scheduler.capacity.&lt;queue-name&gt;.capacity.
   * @param queue
   */
  private void emitChildCapacity(FSQueue queue) {
    CapacityConverter converter =
        CapacityConverterFactory.getConverter(usePercentages);

    converter.convertWeightsForChildQueues(queue,
        capacitySchedulerConfig);

    if (Resources.none().compareTo(queue.getMinShare()) != 0) {
      ruleHandler.handleMinResources();
    }
  }

  /**
   * Missing feature, "leaf-queue-template.capacity" only accepts a single
   * pct value.
   * @param queue
   */
  private void checkMaxChildCapacitySetting(FSQueue queue) {
    if (queue.getMaxChildQueueResource() != null) {
      Resource resource = queue.getMaxChildQueueResource().getResource();

      if ((resource != null && isNotUnboundedResource(resource))
          || queue.getMaxChildQueueResource().getPercentages() != null) {
        // Maximum child resource is defined
        ruleHandler.handleMaxChildCapacity();
      }
    }
  }

  private String getQueueShortName(String queueName) {
    int lastDot = queueName.lastIndexOf(".");
    return queueName.substring(lastDot + 1);
  }

  private boolean isNotUnboundedResource(Resource res) {
    return Resources.unbounded().compareTo(res) != 0;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConversionException 源码

hadoop ConversionOptions 源码

hadoop ConvertedConfigValidator 源码

hadoop DryRunResultHolder 源码

hadoop FSConfigToCSConfigArgumentHandler 源码

hadoop FSConfigToCSConfigConverter 源码

hadoop FSConfigToCSConfigConverterMain 源码

hadoop FSConfigToCSConfigConverterParams 源码

hadoop FSConfigToCSConfigRuleHandler 源码

hadoop FSQueueConverterBuilder 源码

0  赞