hadoop AbstractPreemptableResourceCalculator 源码

  • 2022-10-20
  • 浏览 (191)

haddop AbstractPreemptableResourceCalculator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Calculate how much resources need to be preempted for each queue,
 * will be used by {@link PreemptionCandidatesSelector}.
 */
public class AbstractPreemptableResourceCalculator {
  private static final Logger LOG = LoggerFactory.getLogger(
      AbstractPreemptableResourceCalculator.class);

  protected final CapacitySchedulerPreemptionContext context;
  protected final ResourceCalculator rc;
  protected boolean isReservedPreemptionCandidatesSelector;
  private Resource stepFactor;
  private boolean allowQueuesBalanceAfterAllQueuesSatisfied;

  static class TQComparator implements Comparator<TempQueuePerPartition> {
    private ResourceCalculator rc;
    private Resource clusterRes;

    TQComparator(ResourceCalculator rc, Resource clusterRes) {
      this.rc = rc;
      this.clusterRes = clusterRes;
    }

    @Override
    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
      double assigned1 = getIdealPctOfGuaranteed(tq1);
      double assigned2 = getIdealPctOfGuaranteed(tq2);

      return PriorityUtilizationQueueOrderingPolicy.compare(assigned1,
          assigned2, tq1.relativePriority, tq2.relativePriority);
    }

    // Calculates idealAssigned / guaranteed
    // TempQueues with 0 guarantees are always considered the most over
    // capacity and therefore considered last for resources.
    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
      double pctOver = Integer.MAX_VALUE;
      if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
          Resources.none())) {
        pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
            q.getGuaranteed());
      }
      return (pctOver);
    }
  }

  private static class NormalizationTuple {
    private Resource numerator;
    private Resource denominator;

    NormalizationTuple(Resource numer, Resource denom) {
      this.numerator = numer;
      this.denominator = denom;
    }

    long getNumeratorValue(int i) {
      return numerator.getResourceInformation(i).getValue();
    }

    long getDenominatorValue(int i) {
      String nUnits = numerator.getResourceInformation(i).getUnits();
      ResourceInformation dResourceInformation = denominator
          .getResourceInformation(i);
      return UnitsConversionUtil.convert(
          dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
    }

    float getNormalizedValue(int i) {
      long nValue = getNumeratorValue(i);
      long dValue = getDenominatorValue(i);
      return dValue == 0 ? 0.0f : (float) nValue / dValue;
    }
  }

  /**
   * PreemptableResourceCalculator constructor.
   *
   * @param preemptionContext context
   * @param isReservedPreemptionCandidatesSelector
   *          this will be set by different implementation of candidate
   *          selectors, please refer to TempQueuePerPartition#offer for
   *          details.
   * @param allowQueuesBalanceAfterAllQueuesSatisfied
   *          Should resources be preempted from an over-served queue when the
   *          requesting queues are all at or over their guarantees?
   *          An example is, there're 10 queues under root, guaranteed resource
   *          of them are all 10%.
   *          Assume there're two queues are using resources, queueA uses 10%
   *          queueB uses 90%. For all queues are guaranteed, but it's not fair
   *          for queueA.
   *          We wanna make this behavior can be configured. By default it is
   *          not allowed.
   *
   */
  public AbstractPreemptableResourceCalculator(
      CapacitySchedulerPreemptionContext preemptionContext,
      boolean isReservedPreemptionCandidatesSelector,
      boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
    context = preemptionContext;
    rc = preemptionContext.getResourceCalculator();
    this.isReservedPreemptionCandidatesSelector =
        isReservedPreemptionCandidatesSelector;
    this.allowQueuesBalanceAfterAllQueuesSatisfied =
        allowQueuesBalanceAfterAllQueuesSatisfied;
    stepFactor = Resource.newInstance(0, 0);
    for (ResourceInformation ri : stepFactor.getResources()) {
      ri.setValue(1);
    }
  }

  /**
   * Given a set of queues compute the fix-point distribution of unassigned
   * resources among them. As pending request of a queue are exhausted, the
   * queue is removed from the set and remaining capacity redistributed among
   * remaining queues. The distribution is weighted based on guaranteed
   * capacity, unless asked to ignoreGuarantee, in which case resources are
   * distributed uniformly.
   *
   * @param totGuarant
   *          total guaranteed resource
   * @param qAlloc
   *          List of child queues
   * @param unassigned
   *          Unassigned resource per queue
   * @param ignoreGuarantee
   *          ignore guarantee per queue.
   */
  protected void computeFixpointAllocation(Resource totGuarant,
      Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
      boolean ignoreGuarantee) {
    // Prior to assigning the unused resources, process each queue as follows:
    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
    // Else idealAssigned = current;
    // Subtract idealAssigned resources from unassigned.
    // If the queue has all of its needs met (that is, if
    // idealAssigned >= current + pending), remove it from consideration.
    // Sort queues from most under-guaranteed to most over-guaranteed.
    TQComparator tqComparator = new TQComparator(rc, totGuarant);
    PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
        tqComparator);
    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext(); ) {
      TempQueuePerPartition q = i.next();
      Resource used = q.getUsed();

      Resource initIdealAssigned;
      if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
        initIdealAssigned = Resources.add(
            Resources.componentwiseMin(q.getGuaranteed(), q.getUsed()),
            q.untouchableExtra);
      } else{
        initIdealAssigned = Resources.clone(used);
      }

      // perform initial assignment
      initIdealAssignment(totGuarant, q, initIdealAssigned);

      Resources.subtractFrom(unassigned, q.idealAssigned);

      // If idealAssigned < (allocated + used + pending), q needs more
      // resources, so
      // add it to the list of underserved queues, ordered by need.
      Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
      if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
        orderedByNeed.add(q);
      }
    }

    // assign all cluster resources until no more demand, or no resources are
    // left
    while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
        unassigned, Resources.none())) {
      // we compute normalizedGuarantees capacity based on currently active
      // queues
      resetCapacity(orderedByNeed, ignoreGuarantee);

      // For each underserved queue (or set of queues if multiple are equally
      // underserved), offer its share of the unassigned resources based on its
      // normalized guarantee. After the offer, if the queue is not satisfied,
      // place it back in the ordered list of queues, recalculating its place
      // in the order of most under-guaranteed to most over-guaranteed. In this
      // way, the most underserved queue(s) are always given resources first.
      Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
          orderedByNeed, tqComparator);

      // This value will be used in every round to calculate ideal allocation.
      // So make a copy to avoid it changed during calculation.
      Resource dupUnassignedForTheRound = Resources.clone(unassigned);

      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
          .hasNext();) {
        if (!rc.isAnyMajorResourceAboveZero(unassigned)) {
          break;
        }

        TempQueuePerPartition sub = i.next();

        // How much resource we offer to the queue (to increase its ideal_alloc
        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
            dupUnassignedForTheRound,
            sub.normalizedGuarantee, this.stepFactor);

        // Make sure it is not beyond unassigned
        wQavail = Resources.componentwiseMin(wQavail, unassigned);

        Resource wQidle = sub.offer(wQavail, rc, totGuarant,
            isReservedPreemptionCandidatesSelector,
            allowQueuesBalanceAfterAllQueuesSatisfied);
        Resource wQdone = Resources.subtract(wQavail, wQidle);

        if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
          // The queue is still asking for more. Put it back in the priority
          // queue, recalculating its order based on need.
          orderedByNeed.add(sub);
        }

        Resources.subtractFrom(unassigned, wQdone);

        // Make sure unassigned is always larger than 0
        unassigned = Resources.componentwiseMax(unassigned, Resources.none());
      }
    }

    // Sometimes its possible that, all queues are properly served. So intra
    // queue preemption will not try for any preemption. How ever there are
    // chances that within a queue, there are some imbalances. Hence make sure
    // all queues are added to list.
    while (!orderedByNeed.isEmpty()) {
      TempQueuePerPartition q1 = orderedByNeed.remove();
      context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
    }
  }


  /**
   * This method is visible to allow sub-classes to override the initialization
   * behavior.
   *
   * @param totGuarant total resources (useful for {@code ResourceCalculator}
   *          operations)
   * @param q the {@code TempQueuePerPartition} being initialized
   * @param initIdealAssigned the proposed initialization value.
   */
  protected void initIdealAssignment(Resource totGuarant,
      TempQueuePerPartition q, Resource initIdealAssigned) {
    q.idealAssigned = initIdealAssigned;
  }

  /**
   * Computes a normalizedGuaranteed capacity based on active queues.
   *
   * @param queues
   *          the list of queues to consider
   * @param ignoreGuar
   *          ignore guarantee.
   */
  private void resetCapacity(Collection<TempQueuePerPartition> queues,
                             boolean ignoreGuar) {
    Resource activeCap = Resource.newInstance(0, 0);
    float activeTotalAbsCap = 0.0f;
    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();

    if (ignoreGuar) {
      for (int i = 0; i < maxLength; i++) {
        for (TempQueuePerPartition q : queues) {
          computeNormGuarEvenly(q, queues.size(), i);
        }
      }
    } else {
      for (TempQueuePerPartition q : queues) {
        Resources.addTo(activeCap, q.getGuaranteed());
        activeTotalAbsCap += q.getAbsCapacity();
      }

      // loop through all resource types and normalize guaranteed capacity for all queues
      for (int i = 0; i < maxLength; i++) {
        boolean useAbsCapBasedNorm = false;
        // if the sum of absolute capacity of all queues involved is 0,
        // we should normalize evenly
        boolean useEvenlyDistNorm = activeTotalAbsCap == 0;

        // loop through all the queues once to determine the
        // right normalization strategy for current processing resource type
        for (TempQueuePerPartition q : queues) {
          NormalizationTuple normTuple = new NormalizationTuple(
              q.getGuaranteed(), activeCap);
          long queueGuaranValue = normTuple.getNumeratorValue(i);
          long totalActiveGuaranValue = normTuple.getDenominatorValue(i);

          if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
            // when the rounded value of a resource type is 0 but its absolute capacity is not 0,
            // we should consider taking the normalized guarantee based on absolute capacity
            useAbsCapBasedNorm = true;
            break;
          }

          if (totalActiveGuaranValue == 0) {
            // If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
            // of this resource dimension for all active queues is tiny (close to 0).
            // For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
            // then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
            // get rounded/casted into 0 (double -> long)
            // In this scenario where the denominator is 0, we can just spread resources across
            // all tiny queues evenly since their absoluteCapacity are roughly the same
            useEvenlyDistNorm = true;
          }
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("Queue normalization strategy: " +
              "absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
              "), evenlyDistributedNormalization(" + useEvenlyDistNorm +
              "), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
        }

        // loop through all the queues again to apply normalization strategy
        for (TempQueuePerPartition q : queues) {
          if (useAbsCapBasedNorm) {
            computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
          } else if (useEvenlyDistNorm) {
            computeNormGuarEvenly(q, queues.size(), i);
          } else {
            computeDefaultNormGuar(q, activeCap, i);
          }
        }
      }
    }
  }

  /**
   * Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity.
   *
   * Example:
   *  There are two active queues: queueA & queueB, and
   *  their configured absolute minimum capacity is 1% and 3% respectively.
   *
   *  Then their normalized guaranteed capacity are:
   *    normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
   *    normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
   *
   * @param q
   *          the queue to consider
   * @param activeTotalAbsCap
   *          the sum of absolute capacity of all active queues
   * @param resourceTypeIdx
   *          index of the processing resource type
   */
  private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
                                                     float activeTotalAbsCap,
                                                     int resourceTypeIdx) {
    if (activeTotalAbsCap != 0) {
      q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
    }
  }

  /**
   * Computes the normalized guaranteed capacity evenly based on num of active queues.
   *
   * @param q
   *          the queue to consider
   * @param numOfActiveQueues
   *          number of active queues
   * @param resourceTypeIdx
   *          index of the processing resource type
   */
  private static void computeNormGuarEvenly(TempQueuePerPartition q,
                                            int numOfActiveQueues,
                                            int resourceTypeIdx) {
    q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
  }

  /**
   * The default way to compute a queue's normalized guaranteed capacity.
   *
   * For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
   * the total amount of guaranteed resource of all active queues
   *
   * @param q
   *          the queue to consider
   * @param activeCap
   *          total guaranteed resources of all active queues
   * @param resourceTypeIdx
   *          index of the processing resource type
   */
  private static void computeDefaultNormGuar(TempQueuePerPartition q,
                                             Resource activeCap,
                                             int resourceTypeIdx) {
    NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
    q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
  }

  // Take the most underserved TempQueue (the one on the head). Collect and
  // return the list of all queues that have the same idealAssigned
  // percentage of guaranteed.
  private Collection<TempQueuePerPartition> getMostUnderservedQueues(
      PriorityQueue<TempQueuePerPartition> orderedByNeed,
      TQComparator tqComparator) {
    ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
    while (!orderedByNeed.isEmpty()) {
      TempQueuePerPartition q1 = orderedByNeed.remove();
      underserved.add(q1);

      // Add underserved queues in order for later uses
      context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
      TempQueuePerPartition q2 = orderedByNeed.peek();
      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
      // return what has already been collected. Otherwise, q1's pct of
      // guaranteed == that of q2, so add q2 to underserved list during the
      // next pass.
      if (q2 == null || tqComparator.compare(q1, q2) < 0) {
        if (null != q2) {
          context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
        }
        return underserved;
      }
    }
    return underserved;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractPreemptionEntity 源码

hadoop CapacitySchedulerPreemptionContext 源码

hadoop CapacitySchedulerPreemptionUtils 源码

hadoop FifoCandidatesSelector 源码

hadoop FifoIntraQueuePreemptionPlugin 源码

hadoop IntraQueueCandidatesSelector 源码

hadoop IntraQueuePreemptionComputePlugin 源码

hadoop PreemptableResourceCalculator 源码

hadoop PreemptionCandidatesSelector 源码

hadoop ProportionalCapacityPreemptionPolicy 源码

0  赞