hadoop StageAllocatorLowCostAligned 源码

  • 2022-10-20
  • 浏览 (223)

haddop StageAllocatorLowCostAligned 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeSet;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * A stage allocator that iteratively allocates containers in the
 * {@link DurationInterval} with lowest overall cost. The algorithm only
 * considers non-overlapping intervals of length 'duration'. This guarantees
 * that the allocations are aligned. If 'allocateLeft == true', the intervals
 * considered by the algorithm are aligned to stageArrival; otherwise, they are
 * aligned to stageDeadline. The smoothnessFactor parameter controls the number
 * of containers that are simultaneously allocated in each iteration of the
 * algorithm.
 */

public class StageAllocatorLowCostAligned implements StageAllocator {

  private final boolean allocateLeft;
  // Smoothness factor
  private int smoothnessFactor = 10;

  // Constructor
  public StageAllocatorLowCostAligned(boolean allocateLeft) {
    this.allocateLeft = allocateLeft;
  }

  // Constructor
  public StageAllocatorLowCostAligned(int smoothnessFactor,
      boolean allocateLeft) {
    this.allocateLeft = allocateLeft;
    this.smoothnessFactor = smoothnessFactor;
  }

  @Override
  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, ReservationRequest rr,
      long stageArrival, long stageDeadline, long period, String user,
      ReservationId oldId) throws PlanningException {

    // Initialize
    ResourceCalculator resCalc = plan.getResourceCalculator();
    Resource capacity = plan.getTotalCapacity();

    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
        user, oldId, stageArrival, stageDeadline, period);

    long step = plan.getStep();

    // Create allocationRequestsearlies
    RLESparseResourceAllocation allocationRequests =
        new RLESparseResourceAllocation(plan.getResourceCalculator());

    // Initialize parameters
    long duration = stepRoundUp(rr.getDuration(), step);
    int windowSizeInDurations =
        (int) ((stageDeadline - stageArrival) / duration);
    int totalGangs = rr.getNumContainers() / rr.getConcurrency();
    int numContainersPerGang = rr.getConcurrency();
    Resource gang =
        Resources.multiply(rr.getCapability(), numContainersPerGang);

    // Set maxGangsPerUnit
    int maxGangsPerUnit = (int) Math
        .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
    maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);

    // If window size is too small, return null
    if (windowSizeInDurations <= 0) {
      return null;
    }

    final int preferLeft = allocateLeft ? 1 : -1;

    // Initialize tree sorted by costs
    TreeSet<DurationInterval> durationIntervalsSortedByCost =
        new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
          @Override
          public int compare(DurationInterval val1, DurationInterval val2) {

            int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
            if (cmp != 0) {
              return cmp;
            }

            return preferLeft
                * Long.compare(val1.getEndTime(), val2.getEndTime());
          }
        });

    List<Long> intervalEndTimes =
        computeIntervalEndTimes(stageArrival, stageDeadline, duration);

    // Add durationIntervals that end at (endTime - n*duration) for some n.
    for (long intervalEnd : intervalEndTimes) {

      long intervalStart = intervalEnd - duration;

      // Get duration interval [intervalStart,intervalEnd)
      DurationInterval durationInterval =
          getDurationInterval(intervalStart, intervalEnd, planLoads,
              planModifications, capacity, netRLERes, resCalc, step, gang);

      // If the interval can fit a gang, add it to the tree
      if (durationInterval.canAllocate()) {
        durationIntervalsSortedByCost.add(durationInterval);
      }
    }

    // Allocate
    int remainingGangs = totalGangs;
    while (remainingGangs > 0) {

      // If no durationInterval can fit a gang, break and return null
      if (durationIntervalsSortedByCost.isEmpty()) {
        break;
      }

      // Get best duration interval
      DurationInterval bestDurationInterval =
          durationIntervalsSortedByCost.first();
      int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
      numGangsToAllocate =
          Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
      // Add it
      remainingGangs -= numGangsToAllocate;

      ReservationInterval reservationInt =
          new ReservationInterval(bestDurationInterval.getStartTime(),
              bestDurationInterval.getEndTime());

      Resource reservationRes = Resources.multiply(rr.getCapability(),
          rr.getConcurrency() * numGangsToAllocate);

      planModifications.addInterval(reservationInt, reservationRes);
      allocationRequests.addInterval(reservationInt, reservationRes);

      // Remove from tree
      durationIntervalsSortedByCost.remove(bestDurationInterval);

      // Get updated interval
      DurationInterval updatedDurationInterval =
          getDurationInterval(bestDurationInterval.getStartTime(),
              bestDurationInterval.getStartTime() + duration, planLoads,
              planModifications, capacity, netRLERes, resCalc, step, gang);

      // Add to tree, if possible
      if (updatedDurationInterval.canAllocate()) {
        durationIntervalsSortedByCost.add(updatedDurationInterval);
      }

    }

    // Get the final allocation
    Map<ReservationInterval, Resource> allocations =
        allocationRequests.toIntervalMap();

    // If no gangs are left to place we succeed and return the allocation
    if (remainingGangs <= 0) {
      return allocations;
    } else {

      // If we are here is because we did not manage to satisfy this
      // request.
      // We remove unwanted side-effect from planModifications (needed for
      // ANY).
      for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations
          .entrySet()) {

        planModifications.removeInterval(tempAllocation.getKey(),
            tempAllocation.getValue());

      }
      // Return null to signal failure in this allocation
      return null;

    }

  }

  private List<Long> computeIntervalEndTimes(long stageEarliestStart,
      long stageDeadline, long duration) {

    List<Long> intervalEndTimes = new ArrayList<Long>();
    if (!allocateLeft) {
      for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
          + duration; intervalEnd -= duration) {
        intervalEndTimes.add(intervalEnd);
      }
    } else {
      for (long intervalStart =
          stageEarliestStart; intervalStart <= stageDeadline
              - duration; intervalStart += duration) {
        intervalEndTimes.add(intervalStart + duration);
      }
    }

    return intervalEndTimes;
  }

  protected static DurationInterval getDurationInterval(long startTime,
      long endTime, RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, Resource capacity,
      RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
      long step, Resource requestedResources) throws PlanningException {

    // Get the total cost associated with the duration interval
    double totalCost = getDurationIntervalTotalCost(startTime, endTime,
        planLoads, planModifications, capacity, resCalc, step);

    // Calculate how many gangs can fit, i.e., how many times can 'capacity'
    // be allocated within the duration interval [startTime, endTime)
    int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
        planModifications, capacity, netRLERes, resCalc, requestedResources);

    // Return the desired durationInterval
    return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);

  }

  protected static double getDurationIntervalTotalCost(long startTime,
      long endTime, RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, Resource capacity,
      ResourceCalculator resCalc, long step) throws PlanningException {

    // Compute the current resource load within the interval [startTime,endTime)
    // by adding planLoads (existing load) and planModifications (load that
    // corresponds to the current job).
    RLESparseResourceAllocation currentLoad =
        RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
            planModifications, RLEOperator.add, startTime, endTime);

    // Convert load from RLESparseResourceAllocation to a Map representation
    NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();

    // Initialize auxiliary variables
    double totalCost = 0.0;
    Long tPrev = -1L;
    Resource loadPrev = Resources.none();
    double cost = 0.0;

    // Iterate over time points. For each point 't', accumulate the total cost
    // that corresponds to the interval [tPrev, t). The cost associated within
    // this interval is fixed for each of the time steps, therefore the cost of
    // a single step is multiplied by (t - tPrev) / step.
    for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
      Long t = e.getKey();
      Resource load = e.getValue();
      if (tPrev != -1L) {
        tPrev = Math.max(tPrev, startTime);
        cost = calcCostOfLoad(loadPrev, capacity, resCalc);
        totalCost = totalCost + cost * (t - tPrev) / step;
      }

      tPrev = t;
      loadPrev = load;
    }

    // Add the cost associated with the last interval (the for loop does not
    // calculate it).
    if (loadPrev != null) {

      // This takes care of the corner case of a single entry
      tPrev = Math.max(tPrev, startTime);
      cost = calcCostOfLoad(loadPrev, capacity, resCalc);
      totalCost = totalCost + cost * (endTime - tPrev) / step;
    }

    // Return the overall cost
    return totalCost;
  }

  protected static int getDurationIntervalGangsCanFit(long startTime,
      long endTime, RLESparseResourceAllocation planModifications,
      Resource capacity, RLESparseResourceAllocation netRLERes,
      ResourceCalculator resCalc, Resource requestedResources)
      throws PlanningException {

    // Initialize auxiliary variables
    int gangsCanFit = Integer.MAX_VALUE;
    int curGangsCanFit;

    // Calculate the total amount of available resources between startTime
    // and endTime, by subtracting planModifications from netRLERes
    RLESparseResourceAllocation netAvailableResources =
        RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
            planModifications, RLEOperator.subtractTestNonNegative, startTime,
            endTime);

    // Convert result to a map
    NavigableMap<Long, Resource> mapAvailableCapacity =
        netAvailableResources.getCumulative();

    // Iterate over the map representation.
    // At each point, calculate how many times does 'requestedResources' fit.
    // The result is the minimum over all time points.
    for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
      Long t = e.getKey();
      Resource curAvailable = e.getValue();
      if (t >= endTime) {
        break;
      }

      if (curAvailable == null) {
        gangsCanFit = 0;
      } else {
        curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
            curAvailable, requestedResources));
        if (curGangsCanFit < gangsCanFit) {
          gangsCanFit = curGangsCanFit;
        }
      }
    }
    return gangsCanFit;
  }

  protected double calcCostOfInterval(long startTime, long endTime,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, Resource capacity,
      ResourceCalculator resCalc, long step) {

    // Sum costs in the interval [startTime,endTime)
    double totalCost = 0.0;
    for (long t = startTime; t < endTime; t += step) {
      totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
          resCalc);
    }

    // Return sum
    return totalCost;

  }

  protected double calcCostOfTimeSlot(long t,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, Resource capacity,
      ResourceCalculator resCalc) {

    // Get the current load at time t
    Resource load = getLoadAtTime(t, planLoads, planModifications);

    // Return cost
    return calcCostOfLoad(load, capacity, resCalc);

  }

  protected Resource getLoadAtTime(long t,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications) {

    Resource planLoad = planLoads.getCapacityAtTime(t);

    return Resources.add(planLoad, planModifications.getCapacityAtTime(t));

  }

  protected static double calcCostOfLoad(Resource load, Resource capacity,
      ResourceCalculator resCalc) {

    return resCalc.ratio(load, capacity);

  }

  protected static long stepRoundDown(long t, long step) {
    return (t / step) * step;
  }

  protected static long stepRoundUp(long t, long step) {
    return ((t + step - 1) / step) * step;
  }

  /**
   * An inner class that represents an interval, typically of length duration.
   * The class holds the total cost of the interval and the maximal load inside
   * the interval in each dimension (both calculated externally).
   */
  protected static class DurationInterval {

    private long startTime;
    private long endTime;
    private double cost;
    private final int gangsCanFit;

    // Constructor
    public DurationInterval(long startTime, long endTime, double cost,
        int gangsCanfit) {
      this.startTime = startTime;
      this.endTime = endTime;
      this.cost = cost;
      this.gangsCanFit = gangsCanfit;
    }

    // canAllocate() - boolean function, returns whether requestedResources
    // can be allocated during the durationInterval without
    // violating capacity constraints
    public boolean canAllocate() {
      return (gangsCanFit > 0);
    }

    // numCanFit() - returns the maximal number of requestedResources can be
    // allocated during the durationInterval without violating
    // capacity constraints

    public int numCanFit() {
      return gangsCanFit;
    }

    public long getStartTime() {
      return this.startTime;
    }

    public void setStartTime(long value) {
      this.startTime = value;
    }

    public long getEndTime() {
      return this.endTime;
    }

    public void setEndTime(long value) {
      this.endTime = value;
    }

    public double getTotalCost() {
      return this.cost;
    }

    public void setTotalCost(double value) {
      this.cost = value;
    }

    @Override
    public String toString() {

      StringBuilder sb = new StringBuilder();

      sb.append(" start: " + startTime).append(" end: " + endTime)
          .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);

      return sb.toString();

    }

  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlignedPlannerWithGreedy 源码

hadoop GreedyReservationAgent 源码

hadoop IterativePlanner 源码

hadoop Planner 源码

hadoop PlanningAlgorithm 源码

hadoop ReservationAgent 源码

hadoop SimpleCapacityReplanner 源码

hadoop StageAllocator 源码

hadoop StageAllocatorGreedy 源码

hadoop StageAllocatorGreedyRLE 源码

0  赞