hadoop IterativePlanner 源码

  • 2022-10-20
  • 浏览 (192)

haddop IterativePlanner 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.HashSet;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * A planning algorithm consisting of two main phases. The algorithm iterates
 * over the job stages in ascending/descending order, depending on the flag
 * allocateLeft. For each stage, the algorithm: 1. Determines an interval
 * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an
 * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
 * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For
 * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
 * succcessorStartTime - the starting time of its succeeding stage (or
 * jobDeadline if it is the last stage). The phases are set using the two
 * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
 */
public class IterativePlanner extends PlanningAlgorithm {

  // Modifications performed by the algorithm that are not been reflected in the
  // actual plan while a request is still pending.
  private RLESparseResourceAllocation planModifications;

  // Data extracted from plan
  private RLESparseResourceAllocation planLoads;
  private Resource capacity;
  private long step;

  // Job parameters
  private ReservationRequestInterpreter jobType;
  private long jobArrival;
  private long jobDeadline;

  // Phase algorithms
  private StageExecutionInterval algStageExecutionInterval = null;
  private StageAllocator algStageAllocator = null;
  private final boolean allocateLeft;

  // Constructor
  public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
      StageAllocator algStageAllocator, boolean allocateLeft) {

    this.allocateLeft = allocateLeft;
    setAlgStageExecutionInterval(algStageExecutionInterval);
    setAlgStageAllocator(algStageAllocator);

  }

  @Override
  public RLESparseResourceAllocation computeJobAllocation(Plan plan,
      ReservationId reservationId, ReservationDefinition reservation,
      String user) throws PlanningException {

    // Initialize
    initialize(plan, reservationId, reservation);

    // Create the allocations data structure
    RLESparseResourceAllocation allocations =
        new RLESparseResourceAllocation(plan.getResourceCalculator());

    StageProvider stageProvider = new StageProvider(allocateLeft, reservation);

    // Current stage
    ReservationRequest currentReservationStage;

    // initialize periodicity
    long period = 0;
    if(reservation.getRecurrenceExpression() != null){
      period = Long.parseLong(reservation.getRecurrenceExpression());
    }

    // Iterate the stages in reverse order
    while (stageProvider.hasNext()) {

      // Get current stage
      currentReservationStage = stageProvider.next();

      // Validate that the ReservationRequest respects basic constraints
      validateInputStage(plan, currentReservationStage);

      // Set the stageArrival and stageDeadline
      ReservationInterval stageInterval =
          setStageExecutionInterval(plan, reservation, currentReservationStage,
              allocations);
      Long stageArrival = stageInterval.getStartTime();
      Long stageDeadline = stageInterval.getEndTime();

      // Compute stage allocation
      Map<ReservationInterval, Resource> curAlloc =
          computeStageAllocation(plan, currentReservationStage, stageArrival,
              stageDeadline, period, user, reservationId);

      // If we did not find an allocation, return NULL
      // (unless it's an ANY job, then we simply continue).
      if (curAlloc == null) {

        // If it's an ANY job, we can move to the next possible request
        if (jobType == ReservationRequestInterpreter.R_ANY) {
          continue;
        }

        // Otherwise, the job cannot be allocated
        throw new PlanningException("The request cannot be satisfied");

      }

      // Validate ORDER_NO_GAP
      if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
        if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
          throw new PlanningException(
              "The allocation found does not respect ORDER_NO_GAP");
        }
      }

      // If we did find an allocation for the stage, add it
      for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
        allocations.addInterval(entry.getKey(), entry.getValue());
      }

      // If this is an ANY clause, we have finished
      if (jobType == ReservationRequestInterpreter.R_ANY) {
        break;
      }
    }

    // If the allocation is empty, return an error
    if (allocations.isEmpty()) {
      throw new PlanningException("The request cannot be satisfied");
    }

    return allocations;
  }

  protected static boolean validateOrderNoGap(
      RLESparseResourceAllocation allocations,
      Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {

    // Left to right
    if (allocateLeft) {
      Long stageStartTime = findEarliestTime(curAlloc);
      Long allocationEndTime = allocations.getLatestNonNullTime();

      // Check that there is no gap between stages
      if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
        return false;
      }
      // Right to left
    } else {
      Long stageEndTime = findLatestTime(curAlloc);
      Long allocationStartTime = allocations.getEarliestStartTime();

      // Check that there is no gap between stages
      if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) {
        return false;
      }
    }

    // Check that the stage allocation does not violate ORDER_NO_GAP
    if (!isNonPreemptiveAllocation(curAlloc)) {
      return false;
    }

    // The allocation is legal
    return true;
  }

  protected void initialize(Plan plan, ReservationId reservationId,
      ReservationDefinition reservation) throws PlanningException {

    // Get plan step & capacity
    capacity = plan.getTotalCapacity();
    step = plan.getStep();

    // Get job parameters (type, arrival time & deadline)
    jobType = reservation.getReservationRequests().getInterpreter();
    jobArrival = stepRoundUp(reservation.getArrival(), step);
    jobDeadline = stepRoundDown(reservation.getDeadline(), step);

    // Initialize the plan modifications
    planModifications =
        new RLESparseResourceAllocation(plan.getResourceCalculator());

    // Dirty read of plan load

    // planLoads are not used by other StageAllocators... and don't deal
    // well with huge reservation ranges
    planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
    ReservationAllocation oldRes = plan.getReservationById(reservationId);
    if (oldRes != null) {
      planLoads = RLESparseResourceAllocation.merge(
          plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
          oldRes.getResourcesOverTime(jobArrival, jobDeadline),
          RLEOperator.subtract, jobArrival, jobDeadline);
    }
  }

  private void validateInputStage(Plan plan, ReservationRequest rr)
      throws ContractValidationException {

    // Validate concurrency
    if (rr.getConcurrency() < 1) {
      throw new ContractValidationException("Gang Size should be >= 1");
    }

    // Validate number of containers
    if (rr.getNumContainers() <= 0) {
      throw new ContractValidationException("Num containers should be > 0");
    }

    // Check that gangSize and numContainers are compatible
    if (rr.getNumContainers() % rr.getConcurrency() != 0) {
      throw new ContractValidationException(
          "Parallelism must be an exact multiple of gang size");
    }

    // Check that the largest container request does not exceed the cluster-wide
    // limit for container sizes
    if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
        rr.getCapability(), plan.getMaximumAllocation())) {

      throw new ContractValidationException(
          "Individual capability requests should not exceed cluster's "
              + "maxAlloc");

    }

  }

  private static boolean isNonPreemptiveAllocation(
      Map<ReservationInterval, Resource> curAlloc) {

    // Checks whether a stage allocation is non preemptive or not.
    // Assumption: the intervals are non-intersecting (as returned by
    // computeStageAllocation()).
    // For a non-preemptive allocation, only two end points appear exactly once

    Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size());
    for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {

      ReservationInterval interval = entry.getKey();
      Resource resource = entry.getValue();

      // Ignore intervals with no allocation
      if (Resources.equals(resource, Resource.newInstance(0, 0))) {
        continue;
      }

      // Get endpoints
      Long left = interval.getStartTime();
      Long right = interval.getEndTime();

      // Add left endpoint if we haven't seen it before, remove otherwise
      if (!endPoints.contains(left)) {
        endPoints.add(left);
      } else {
        endPoints.remove(left);
      }

      // Add right endpoint if we haven't seen it before, remove otherwise
      if (!endPoints.contains(right)) {
        endPoints.add(right);
      } else {
        endPoints.remove(right);
      }
    }

    // Non-preemptive only if endPoints is of size 2
    return (endPoints.size() == 2);

  }

  // Call setStageExecutionInterval()
  protected ReservationInterval setStageExecutionInterval(Plan plan,
      ReservationDefinition reservation,
      ReservationRequest currentReservationStage,
      RLESparseResourceAllocation allocations) {
    return algStageExecutionInterval.computeExecutionInterval(plan,
        reservation, currentReservationStage, allocateLeft, allocations);
  }

  // Call algStageAllocator
  protected Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
      ReservationRequest rr, long stageArrivalTime, long stageDeadline,
      long period, String user, ReservationId oldId) throws PlanningException {

    return algStageAllocator.computeStageAllocation(plan, planLoads,
        planModifications, rr, stageArrivalTime, stageDeadline, period, user,
        oldId);

  }

  // Set the algorithm: algStageExecutionInterval
  public IterativePlanner setAlgStageExecutionInterval(
      StageExecutionInterval alg) {

    this.algStageExecutionInterval = alg;
    return this; // To allow concatenation of setAlg() functions

  }

  // Set the algorithm: algStageAllocator
  public IterativePlanner setAlgStageAllocator(StageAllocator alg) {

    this.algStageAllocator = alg;
    return this; // To allow concatenation of setAlg() functions

  }

  /**
   * Helper class that provide a list of ReservationRequests and iterates
   * forward or backward depending whether we are allocating left-to-right or
   * right-to-left.
   */
  public static class StageProvider {

    private final boolean allocateLeft;

    private final ListIterator<ReservationRequest> li;

    public StageProvider(boolean allocateLeft,
        ReservationDefinition reservation) {

      this.allocateLeft = allocateLeft;
      int startingIndex;
      if (allocateLeft) {
        startingIndex = 0;
      } else {
        startingIndex =
            reservation.getReservationRequests().getReservationResources()
                .size();
      }
      // Get a reverse iterator for the set of stages
      li =
          reservation.getReservationRequests().getReservationResources()
              .listIterator(startingIndex);

    }

    public boolean hasNext() {
      if (allocateLeft) {
        return li.hasNext();
      } else {
        return li.hasPrevious();
      }
    }

    public ReservationRequest next() {
      if (allocateLeft) {
        return li.next();
      } else {
        return li.previous();
      }
    }

    public int getCurrentIndex() {
      if (allocateLeft) {
        return li.nextIndex() - 1;
      } else {
        return li.previousIndex() + 1;
      }
    }

  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AlignedPlannerWithGreedy 源码

hadoop GreedyReservationAgent 源码

hadoop Planner 源码

hadoop PlanningAlgorithm 源码

hadoop ReservationAgent 源码

hadoop SimpleCapacityReplanner 源码

hadoop StageAllocator 源码

hadoop StageAllocatorGreedy 源码

hadoop StageAllocatorGreedyRLE 源码

hadoop StageAllocatorLowCostAligned 源码

0  赞