hadoop DominantResourceFairnessPolicy 源码

  • 2022-10-20
  • 浏览 (180)

haddop DominantResourceFairnessPolicy 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;

/**
 * Makes scheduling decisions by trying to equalize dominant resource usage.
 * A schedulable's dominant resource usage is the largest ratio of resource
 * usage to capacity among the resource types it is using.
 */
@Private
@Unstable
public class DominantResourceFairnessPolicy extends SchedulingPolicy {

  public static final String NAME = "DRF";

  private static final int NUM_RESOURCES =
      ResourceUtils.getNumberOfCountableResourceTypes();
  private static final DominantResourceFairnessComparator COMPARATORN =
      new DominantResourceFairnessComparatorN();
  private static final DominantResourceFairnessComparator COMPARATOR2 =
      new DominantResourceFairnessComparator2();
  private static final DominantResourceCalculator CALCULATOR =
      new DominantResourceCalculator();

  @Override
  public String getName() {
    return NAME;
  }

  @Override
  public Comparator<Schedulable> getComparator() {
    if (NUM_RESOURCES == 2) {
      // To improve performance, if we know we're dealing with the common
      // case of only CPU and memory, then handle CPU and memory explicitly.
      return COMPARATOR2;
    } else {
      // Otherwise, do it the generic way.
      return COMPARATORN;
    }

  }

  @Override
  public ResourceCalculator getResourceCalculator() {
    return CALCULATOR;
  }

  @Override
  public void computeShares(Collection<? extends Schedulable> schedulables,
      Resource totalResources) {
    for (ResourceInformation info: ResourceUtils.getResourceTypesArray()) {
      ComputeFairShares.computeShares(schedulables, totalResources,
          info.getName());
    }
  }

  @Override
  public void computeSteadyShares(Collection<? extends FSQueue> queues,
      Resource totalResources) {
    for (ResourceInformation info: ResourceUtils.getResourceTypesArray()) {
      ComputeFairShares.computeSteadyShares(queues, totalResources,
          info.getName());
    }
  }

  @Override
  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
    return !Resources.fitsIn(usage, fairShare);
  }

  @Override
  public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
                              Resource maxAvailable) {
    long queueAvailableMemory =
        Math.max(queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
    int queueAvailableCPU =
        Math.max(queueFairShare.getVirtualCores() - queueUsage
            .getVirtualCores(), 0);
    Resource headroom = Resources.createResource(
        Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
        Math.min(maxAvailable.getVirtualCores(),
            queueAvailableCPU));
    return headroom;
  }

  @Override
  public void initialize(FSContext fsContext) {
    COMPARATORN.setFSContext(fsContext);
    COMPARATOR2.setFSContext(fsContext);
  }

  /**
   * This class compares two {@link Schedulable} instances according to the
   * DRF policy. If neither instance is below min share, approximate fair share
   * ratios are compared. Subclasses of this class will do the actual work of
   * the comparison, specialized for the number of configured resource types.
   */
  public abstract static class DominantResourceFairnessComparator
      implements Comparator<Schedulable> {
    protected FSContext fsContext;

    public void setFSContext(FSContext fsContext) {
      this.fsContext = fsContext;
    }

    /**
     * This method is used when apps are tied in fairness ratio. It breaks
     * the tie by submit time and job name to get a deterministic ordering,
     * which is useful for unit tests.
     *
     * @param s1 the first item to compare
     * @param s2 the second item to compare
     * @return &lt; 0, 0, or &gt; 0 if the first item is less than, equal to,
     * or greater than the second item, respectively
     */
    protected int compareAttributes(Schedulable s1, Schedulable s2) {
      int res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());

      if (res == 0) {
        res = s1.getName().compareTo(s2.getName());
      }

      return res;
    }
  }

  /**
   * This class compares two {@link Schedulable} instances according to the
   * DRF policy. If neither instance is below min share, approximate fair share
   * ratios are compared. This class makes no assumptions about the number of
   * resource types.
   */
  @VisibleForTesting
  static class DominantResourceFairnessComparatorN
      extends DominantResourceFairnessComparator {
    @Override
    public int compare(Schedulable s1, Schedulable s2) {
      Resource usage1 = s1.getResourceUsage();
      Resource usage2 = s2.getResourceUsage();
      Resource minShare1 = s1.getMinShare();
      Resource minShare2 = s2.getMinShare();
      Resource clusterCapacity = fsContext.getClusterResource();

      // These arrays hold the usage, fair, and min share ratios for each
      // resource type. ratios[0][x] are the usage ratios, ratios[1][x] are
      // the fair share ratios, and ratios[2][x] are the min share ratios.
      float[][] ratios1 = new float[NUM_RESOURCES][3];
      float[][] ratios2 = new float[NUM_RESOURCES][3];

      // Calculate cluster shares and approximate fair shares for each
      // resource type of both schedulables.
      int dominant1 = calculateClusterAndFairRatios(usage1, clusterCapacity,
          ratios1, s1.getWeight());
      int dominant2 = calculateClusterAndFairRatios(usage2, clusterCapacity,
          ratios2, s2.getWeight());

      // A queue is needy for its min share if its dominant resource
      // (with respect to the cluster capacity) is below its configured min
      // share for that resource
      boolean s1Needy =
          usage1.getResources()[dominant1].getValue() <
          minShare1.getResources()[dominant1].getValue();
      boolean s2Needy =
          usage2.getResources()[dominant2].getValue() <
          minShare2.getResources()[dominant2].getValue();
      
      int res;

      if (!s2Needy && !s1Needy) {
        // Sort shares by usage ratio and compare them by approximate fair share
        // ratio
        sortRatios(ratios1, ratios2);
        res = compareRatios(ratios1, ratios2, 1);
      } else if (s1Needy && !s2Needy) {
        res = -1;
      } else if (s2Needy && !s1Needy) {
        res = 1;
      } else { // both are needy below min share
        // Calculate the min share ratios, then sort by usage ratio, and compare
        // by min share ratio
        calculateMinShareRatios(usage1, minShare1, ratios1);
        calculateMinShareRatios(usage2, minShare2, ratios2);
        sortRatios(ratios1, ratios2);
        res = compareRatios(ratios1, ratios2, 2);
      }

      if (res == 0) {
        res = compareAttributes(s1, s2);
      }

      return res;
    }

    /**
     * Sort both ratios arrays according to the usage ratios (the
     * first index of the inner arrays, e.g. {@code ratios1[x][0]}).
     *
     * @param ratios1 the first ratios array
     * @param ratios2 the second ratios array
     */
    @VisibleForTesting
    void sortRatios(float[][] ratios1, float[][]ratios2) {
      // sort order descending by resource share
      Arrays.sort(ratios1, (float[] o1, float[] o2) ->
          (int) Math.signum(o2[0] - o1[0]));
      Arrays.sort(ratios2, (float[] o1, float[] o2) ->
          (int) Math.signum(o2[0] - o1[0]));
    }

    /**
     * Calculate a resource's usage ratio and approximate fair share ratio.
     * The {@code ratios} array will be populated with both the usage ratio
     * and the approximate fair share ratio for each resource type. The usage
     * ratio is calculated as {@code resource} divided by {@code cluster}.
     * The approximate fair share ratio is calculated as the usage ratio
     * divided by {@code weight}. If the cluster's resources are 100MB and
     * 10 vcores, and the usage ({@code resource}) is 10 MB and 5 CPU, the
     * usage ratios will be 0.1 and 0.5. If the weights are 2, the fair
     * share ratios will be 0.05 and 0.25.
     *
     * The approximate fair share ratio is the usage divided by the
     * approximate fair share, i.e. the cluster resources times the weight.
     * The approximate fair share is an acceptable proxy for the fair share
     * because when comparing resources, the resource with the higher weight
     * will be assigned by the scheduler a proportionally higher fair share.
     *
     * The {@code ratios} array must be at least <i>n</i> x 2, where <i>n</i>
     * is the number of resource types. Only the first and second indices of
     * the inner arrays in the {@code ratios} array will be used, e.g.
     * {@code ratios[x][0]} and {@code ratios[x][1]}.
     *
     * The return value will be the index of the dominant resource type in the
     * {@code ratios} array. The dominant resource is the resource type for
     * which {@code resource} has the largest usage ratio.
     *
     * @param resource the resource for which to calculate ratios
     * @param cluster the total cluster resources
     * @param ratios the share ratios array to populate
     * @param weight the resource weight
     * @return the index of the resource type with the largest cluster share
     */
    @VisibleForTesting
    int calculateClusterAndFairRatios(Resource resource, Resource cluster,
        float[][] ratios, float weight) {
      ResourceInformation[] resourceInfo = resource.getResources();
      ResourceInformation[] clusterInfo = cluster.getResources();
      int max = 0;

      for (int i = 0; i < clusterInfo.length; i++) {
        // First calculate the cluster share
        ratios[i][0] =
            resourceInfo[i].getValue() / (float) clusterInfo[i].getValue();

        // Use the cluster share to find the dominant resource
        if (ratios[i][0] > ratios[max][0]) {
          max = i;
        }

        // Now divide by the weight to get the approximate fair share.
        // It's OK if the weight is zero, because the floating point division
        // will yield Infinity, i.e. this Schedulable will lose out to any
        // other Schedulable with non-zero weight.
        ratios[i][1] = ratios[i][0] / weight;
      }

      return max;
    }
    
    /**
     * Calculate a resource's min share ratios. The {@code ratios} array will be
     * populated with the {@code resource} divided by {@code minShare} for each
     * resource type. If the min shares are 5 MB and 10 vcores, and the usage
     * ({@code resource}) is 10 MB and 5 CPU, the ratios will be 2 and 0.5.
     *
     * The {@code ratios} array must be <i>n</i> x 3, where <i>n</i> is the
     * number of resource types. Only the third index of the inner arrays in
     * the {@code ratios} array will be used, e.g. {@code ratios[x][2]}.
     *
     * @param resource the resource for which to calculate min shares
     * @param minShare the min share
     * @param ratios the share ratios array to populate
     */
    @VisibleForTesting
    void calculateMinShareRatios(Resource resource, Resource minShare,
        float[][] ratios) {
      ResourceInformation[] resourceInfo = resource.getResources();
      ResourceInformation[] minShareInfo = minShare.getResources();

      for (int i = 0; i < minShareInfo.length; i++) {
        ratios[i][2] =
            resourceInfo[i].getValue() / (float) minShareInfo[i].getValue();
      }
    }

    /**
     * Compare the two ratios arrays and return -1, 0, or 1 if the first array
     * is less than, equal to, or greater than the second array, respectively.
     * The {@code index} parameter determines which index of the inner arrays
     * will be used for the comparisons. 0 is for usage ratios, 1 is for
     * fair share ratios, and 2 is for the min share ratios. The ratios arrays
     * are assumed to be sorted in descending order by usage ratio.
     *
     * @param ratios1 the first shares array
     * @param ratios2 the second shares array
     * @param index the outer index of the ratios arrays to compare. 0 is for
     * usage ratio, 1 is for approximate fair share ratios, and 1 is for min
     * share ratios
     * @return -1, 0, or 1 if the first array is less than, equal to, or
     * greater than the second array, respectively
     */
    @VisibleForTesting
    int compareRatios(float[][] ratios1, float[][] ratios2, int index) {
      int ret = 0;

      for (int i = 0; i < ratios1.length; i++) {
        ret = (int) Math.signum(ratios1[i][index] - ratios2[i][index]);

        if (ret != 0) {
          break;
        }
      }

      return ret;
    }
  }

  /**
   * This class compares two {@link Schedulable} instances according to the
   * DRF policy in the special case that only CPU and memory are configured.
   * If neither instance is below min share, approximate fair share
   * ratios are compared.
   */
  @VisibleForTesting
  static class DominantResourceFairnessComparator2
      extends DominantResourceFairnessComparator {
    @Override
    public int compare(Schedulable s1, Schedulable s2) {
      ResourceInformation[] resourceInfo1 =
          s1.getResourceUsage().getResources();
      ResourceInformation[] resourceInfo2 =
          s2.getResourceUsage().getResources();
      ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
      ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
      ResourceInformation[] clusterInfo =
          fsContext.getClusterResource().getResources();
      double[] shares1 = new double[2];
      double[] shares2 = new double[2];

      int dominant1 = calculateClusterAndFairRatios(resourceInfo1,
          s1.getWeight(), clusterInfo, shares1);
      int dominant2 = calculateClusterAndFairRatios(resourceInfo2,
          s2.getWeight(), clusterInfo, shares2);

      // A queue is needy for its min share if its dominant resource
      // (with respect to the cluster capacity) is below its configured min
      // share for that resource
      boolean s1Needy = resourceInfo1[dominant1].getValue() <
          minShareInfo1[dominant1].getValue();
      boolean s2Needy = resourceInfo1[dominant2].getValue() <
          minShareInfo2[dominant2].getValue();

      int res;

      if (!s2Needy && !s1Needy) {
        res = (int) Math.signum(shares1[dominant1] - shares2[dominant2]);

        if (res == 0) {
          // Because memory and CPU are indices 0 and 1, we can find the
          // non-dominant index by subtracting the dominant index from 1.
          res = (int) Math.signum(shares1[1 - dominant1] -
              shares2[1 - dominant2]);
        }
      } else if (s1Needy && !s2Needy) {
        res = -1;
      } else if (s2Needy && !s1Needy) {
        res = 1;
      } else {
        double[] minShares1 =
            calculateMinShareRatios(resourceInfo1, minShareInfo1);
        double[] minShares2 =
            calculateMinShareRatios(resourceInfo2, minShareInfo2);

        res = (int) Math.signum(minShares1[dominant1] - minShares2[dominant2]);

        if (res == 0) {
          res = (int) Math.signum(minShares1[1 - dominant1] -
              minShares2[1 - dominant2]);
        }
      }

      if (res == 0) {
        res = compareAttributes(s1, s2);
      }

      return res;
    }

    /**
     * Calculate a resource's usage ratio and approximate fair share ratio
     * assuming that CPU and memory are the only configured resource types.
     * The {@code shares} array will be populated with the approximate fair
     * share ratio for each resource type. The approximate fair share ratio
     * is calculated as {@code resourceInfo} divided by {@code cluster} and
     * the {@code weight}. If the cluster's resources are 100MB and
     * 10 vcores, the usage ({@code resourceInfo}) is 10 MB and 5 CPU, and the
     * weights are 2, the fair share ratios will be 0.05 and 0.25.
     *
     * The approximate fair share ratio is the usage divided by the
     * approximate fair share, i.e. the cluster resources times the weight.
     * The approximate fair share is an acceptable proxy for the fair share
     * because when comparing resources, the resource with the higher weight
     * will be assigned by the scheduler a proportionally higher fair share.
     *
     * The length of the {@code shares} array must be at least 2.
     *
     * The return value will be the index of the dominant resource type in the
     * {@code shares} array. The dominant resource is the resource type for
     * which {@code resourceInfo} has the largest usage ratio.
     *
     * @param resourceInfo the resource for which to calculate ratios
     * @param weight the resource weight
     * @param clusterInfo the total cluster resources
     * @param shares the share ratios array to populate
     * @return the index of the resource type with the largest cluster share
     */
    @VisibleForTesting
    int calculateClusterAndFairRatios(ResourceInformation[] resourceInfo,
        float weight, ResourceInformation[] clusterInfo, double[] shares) {
      int dominant;

      shares[Resource.MEMORY_INDEX] =
          ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
          clusterInfo[Resource.MEMORY_INDEX].getValue();
      shares[Resource.VCORES_INDEX] =
          ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
          clusterInfo[Resource.VCORES_INDEX].getValue();
      dominant =
          shares[Resource.VCORES_INDEX] > shares[Resource.MEMORY_INDEX] ?
          Resource.VCORES_INDEX : Resource.MEMORY_INDEX;

      shares[Resource.MEMORY_INDEX] /= weight;
      shares[Resource.VCORES_INDEX] /= weight;

      return dominant;
    }

    /**
     * Calculate a resource's min share ratios assuming that CPU and memory
     * are the only configured resource types. The return array will be
     * populated with the {@code resourceInfo} divided by {@code minShareInfo}
     * for each resource type. If the min shares are 5 MB and 10 vcores, and
     * the usage ({@code resourceInfo}) is 10 MB and 5 CPU, the ratios will
     * be 2 and 0.5.
     *
     * The length of the {@code ratios} array must be 2.
     *
     * @param resourceInfo the resource for which to calculate min shares
     * @param minShareInfo the min share
     * @return the share ratios
     */
    @VisibleForTesting
    double[] calculateMinShareRatios(ResourceInformation[] resourceInfo,
        ResourceInformation[] minShareInfo) {
      double[] minShares1 = new double[2];

      // both are needy below min share
      minShares1[Resource.MEMORY_INDEX] =
          ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) /
          minShareInfo[Resource.MEMORY_INDEX].getValue();
      minShares1[Resource.VCORES_INDEX] =
          ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) /
          minShareInfo[Resource.VCORES_INDEX].getValue();

      return minShares1;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ComputeFairShares 源码

hadoop FairSharePolicy 源码

hadoop FifoPolicy 源码

0  赞