hadoop PlanningAlgorithm 源码

  • 2022-10-20
  • 浏览 (201)

haddop PlanningAlgorithm 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;

/**
 * An abstract class that follows the general behavior of planning algorithms.
 */
public abstract class PlanningAlgorithm implements ReservationAgent {

  /**
   * Performs the actual allocation for a ReservationDefinition within a Plan.
   *
   * @param reservationId the identifier of the reservation
   * @param user the user who owns the reservation
   * @param plan the Plan to which the reservation must be fitted
   * @param contract encapsulates the resources required by the user for his
   *          session
   * @param oldReservation the existing reservation (null if none)
   * @return whether the allocateUser function was successful or not
   *
   * @throws PlanningException if the session cannot be fitted into the plan
   * @throws ContractValidationException if validation fails
   */
  protected boolean allocateUser(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract,
      ReservationAllocation oldReservation) throws PlanningException,
      ContractValidationException {

    // Adjust the ResourceDefinition to account for system "imperfections"
    // (e.g., scheduling delays for large containers).
    ReservationDefinition adjustedContract = adjustContract(plan, contract);

    // Compute the job allocation
    RLESparseResourceAllocation allocation =
            computeJobAllocation(plan, reservationId, adjustedContract, user);

    long period = Long.parseLong(contract.getRecurrenceExpression());

    // Make allocation periodic if request is periodic
    if (contract.getRecurrenceExpression() != null) {
      if (period > 0) {
        allocation =
            new PeriodicRLESparseResourceAllocation(allocation, period);
      }
    }

    // If no job allocation was found, fail
    if (allocation == null) {
      throw new PlanningException(
              "The planning algorithm could not find a valid allocation"
                      + " for your request");
    }

    // Translate the allocation to a map (with zero paddings)
    long step = plan.getStep();

    long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
    long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);

    Map<ReservationInterval, Resource> mapAllocations =
        allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);

    // Create the reservation
    ReservationAllocation capReservation =
        new InMemoryReservationAllocation(reservationId, // ID
            adjustedContract, // Contract
            user, // User name
            plan.getQueueName(), // Queue name
            adjustedContract.getArrival(), adjustedContract.getDeadline(),
            mapAllocations, // Allocations
            plan.getResourceCalculator(), // Resource calculator
            plan.getMinimumAllocation()); // Minimum allocation

    // Add (or update) the reservation allocation
    if (oldReservation != null) {
      return plan.updateReservation(capReservation);
    } else {
      return plan.addReservation(capReservation, false);
    }

  }

  private Map<ReservationInterval, Resource> allocationsToPaddedMap(
      RLESparseResourceAllocation allocation, long jobArrival, long jobDeadline,
      long period) {

    // Zero allocation
    Resource zeroResource = Resource.newInstance(0, 0);

    if (period > 0) {
      if ((jobDeadline - jobArrival) >= period) {
        allocation.addInterval(new ReservationInterval(0L, period),
            zeroResource);
      }
      jobArrival = jobArrival % period;
      jobDeadline = jobDeadline % period;

      if (jobArrival <= jobDeadline) {
        allocation.addInterval(new ReservationInterval(0, jobArrival),
            zeroResource);
        allocation.addInterval(new ReservationInterval(jobDeadline, period),
            zeroResource);
      } else {
        allocation.addInterval(new ReservationInterval(jobDeadline, jobArrival),
            zeroResource);
      }
    } else {
      // Pad at the beginning
      long earliestStart = findEarliestTime(allocation.toIntervalMap());
      if (jobArrival < earliestStart) {
        allocation.addInterval(
            new ReservationInterval(jobArrival, earliestStart), zeroResource);
      }

      // Pad at the beginning
      long latestEnd = findLatestTime(allocation.toIntervalMap());
      if (latestEnd < jobDeadline) {
        allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
            zeroResource);
      }
    }
    return allocation.toIntervalMap();
  }

  public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
      ReservationId reservationId, ReservationDefinition reservation,
      String user) throws PlanningException, ContractValidationException;

  @Override
  public boolean createReservation(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract) throws PlanningException {

    // Allocate
    return allocateUser(reservationId, user, plan, contract, null);

  }

  @Override
  public boolean updateReservation(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract) throws PlanningException {

    // Get the old allocation
    ReservationAllocation oldAlloc = plan.getReservationById(reservationId);

    // Allocate (ignores the old allocation)
    return allocateUser(reservationId, user, plan, contract, oldAlloc);

  }

  @Override
  public boolean deleteReservation(ReservationId reservationId, String user,
      Plan plan) throws PlanningException {

    // Delete the existing reservation
    return plan.deleteReservation(reservationId);

  }

  protected static long findEarliestTime(
      Map<ReservationInterval, Resource> sesInt) {

    long ret = Long.MAX_VALUE;
    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
      if (s.getKey().getStartTime() < ret && s.getValue() != null) {
        ret = s.getKey().getStartTime();
      }
    }
    return ret;

  }

  protected static long findLatestTime(Map<ReservationInterval,
      Resource> sesInt) {

    long ret = Long.MIN_VALUE;
    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
      if (s.getKey().getEndTime() > ret && s.getValue() != null) {
        ret = s.getKey().getEndTime();
      }
    }
    return ret;

  }

  protected static long stepRoundDown(long t, long step) {
    return (t / step) * step;
  }

  protected static long stepRoundUp(long t, long step) {
    return ((t + step - 1) / step) * step;
  }

  private ReservationDefinition adjustContract(Plan plan,
      ReservationDefinition originalContract) {

    // Place here adjustment. For example using QueueMetrics we can track
    // large container delays per YARN-YARN-1990

    return originalContract;

  }

  @Override
  public void init(Configuration conf) {
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlignedPlannerWithGreedy 源码

hadoop GreedyReservationAgent 源码

hadoop IterativePlanner 源码

hadoop Planner 源码

hadoop ReservationAgent 源码

hadoop SimpleCapacityReplanner 源码

hadoop StageAllocator 源码

hadoop StageAllocatorGreedy 源码

hadoop StageAllocatorGreedyRLE 源码

hadoop StageAllocatorLowCostAligned 源码

0  赞