hadoop AbstractSchedulerPlanFollower 源码

  • 2022-10-20
  • 浏览 (199)

haddop AbstractSchedulerPlanFollower 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class);

  protected Collection<Plan> plans = new ArrayList<Plan>();
  protected YarnScheduler scheduler;
  protected Clock clock;

  @Override
  public void init(Clock clock, ResourceScheduler sched,
      Collection<Plan> plans) {
    this.clock = clock;
    this.scheduler = sched;
    this.plans.addAll(plans);
  }

  @Override
  public synchronized void run() {
    for (Plan plan : plans) {
      synchronizePlan(plan, true);
    }
  }

  @Override
  public synchronized void setPlans(Collection<Plan> plans) {
    this.plans.clear();
    this.plans.addAll(plans);
  }

  @Override
  public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
    String planQueueName = plan.getQueueName();
    LOG.debug("Running plan follower edit policy for plan: {}", planQueueName);
    // align with plan step
    long step = plan.getStep();
    long now = clock.getTime();
    if (now % step != 0) {
      now += step - (now % step);
    }
    Queue planQueue = getPlanQueue(planQueueName);
    if (planQueue == null) {
      return;
    }

    // first we publish to the plan the current availability of resources
    Resource clusterResources = scheduler.getClusterResource();
    Resource planResources =
        getPlanResources(plan, planQueue, clusterResources);
    Set<ReservationAllocation> currentReservations =
        plan.getReservationsAtTime(now);
    Set<String> curReservationNames = new HashSet<String>();
    Resource reservedResources = Resource.newInstance(0, 0);
    int numRes = getReservedResources(now, currentReservations,
        curReservationNames, reservedResources);
    // create the default reservation queue if it doesnt exist
    String defReservationId = getReservationIdFromQueueName(planQueueName)
        + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
    String defReservationQueue =
        getReservationQueueName(planQueueName, defReservationId);
    createDefaultReservationQueue(planQueueName, planQueue, defReservationId);
    curReservationNames.add(defReservationId);
    // if the resources dedicated to this plan has shrunk invoke replanner
    boolean shouldResize = false;
    if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(),
        clusterResources, planResources, reservedResources)) {
      if (shouldReplan) {
        try {
          plan.getReplanner().plan(plan, null);
        } catch (PlanningException e) {
          LOG.warn("Exception while trying to replan: {}", planQueueName, e);
        }
      } else {
        shouldResize = true;
      }
    }
    // identify the reservations that have expired and new reservations that
    // have to be activated
    List<? extends Queue> resQueues = getChildReservationQueues(planQueue);
    Set<String> expired = new HashSet<String>();
    for (Queue resQueue : resQueues) {
      String resQueueName = resQueue.getQueueName();
      String reservationId = getReservationIdFromQueueName(resQueueName);
      if (curReservationNames.contains(reservationId)) {
        // it is already existing reservation, so needed not create new
        // reservation queue
        curReservationNames.remove(reservationId);
      } else {
        // the reservation has termination, mark for cleanup
        expired.add(reservationId);
      }
    }
    // garbage collect expired reservations
    cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
        defReservationQueue);
    // Add new reservations and update existing ones
    float totalAssignedCapacity = 0f;
    if (currentReservations != null) {
      // first release all excess capacity in default queue
      try {
        setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f);
      } catch (YarnException e) {
        LOG.warn(
            "Exception while trying to release default queue capacity for plan: {}",
            planQueueName, e);
      }
      // sort allocations from the one giving up the most resources, to the
      // one asking for the most avoid order-of-operation errors that
      // temporarily violate 100% capacity bound
      List<ReservationAllocation> sortedAllocations = sortByDelta(
          new ArrayList<ReservationAllocation>(currentReservations), now, plan);
      for (ReservationAllocation res : sortedAllocations) {
        String currResId = res.getReservationId().toString();
        if (curReservationNames.contains(currResId)) {
          addReservationQueue(planQueueName, planQueue, currResId);
        }
        Resource capToAssign = res.getResourcesAtTime(now);
        float targetCapacity = 0f;
        if (planResources.getMemorySize() > 0
            && planResources.getVirtualCores() > 0) {
          if (shouldResize) {
            capToAssign = calculateReservationToPlanProportion(
                plan.getResourceCalculator(), planResources, reservedResources,
                capToAssign);
          }
          targetCapacity =
              calculateReservationToPlanRatio(plan.getResourceCalculator(),
                  clusterResources, planResources, capToAssign);
        }
        LOG.debug(
              "Assigning capacity of {} to queue {} with target capacity {}",
              capToAssign, currResId, targetCapacity);
        // set maxCapacity to 100% unless the job requires gang, in which
        // case we stick to capacity (as running early/before is likely a
        // waste of resources)
        float maxCapacity = 1.0f;
        if (res.containsGangs()) {
          maxCapacity = targetCapacity;
        }
        try {
          setQueueEntitlement(planQueueName, currResId, targetCapacity,
              maxCapacity);
        } catch (YarnException e) {
          LOG.warn("Exception while trying to size reservation for plan: {}",
              currResId, planQueueName, e);
        }
        totalAssignedCapacity += targetCapacity;
      }
    }
    // compute the default queue capacity
    float defQCap = 1.0f - totalAssignedCapacity;
    LOG.debug(
          "PlanFollowerEditPolicyTask: total Plan Capacity: {} "
              + "currReservation: {} default-queue capacity: {}",
          planResources, numRes, defQCap);
    // set the default queue to eat-up all remaining capacity
    try {
      setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f);
    } catch (YarnException e) {
      LOG.warn(
          "Exception while trying to reclaim default queue capacity for plan: {}",
          planQueueName, e);
    }
    // garbage collect finished reservations from plan
    try {
      plan.archiveCompletedReservations(now);
    } catch (PlanningException e) {
      LOG.error("Exception in archiving completed reservations: ", e);
    }
    LOG.info("Finished iteration of plan follower edit policy for plan: "
        + planQueueName);
    // Extension: update plan with app states,
    // useful to support smart replanning
  }

  protected String getReservationIdFromQueueName(String resQueueName) {
    return resQueueName;
  }

  protected void setQueueEntitlement(String planQueueName, String currResId,
      float targetCapacity, float maxCapacity) throws YarnException {
    String reservationQueueName =
        getReservationQueueName(planQueueName, currResId);
    scheduler.setEntitlement(reservationQueueName,
        new QueueEntitlement(targetCapacity, maxCapacity));
  }

  // Schedulers have different ways of naming queues. See YARN-2773
  protected String getReservationQueueName(String planQueueName,
      String reservationId) {
    return reservationId;
  }

  /**
   * First sets entitlement of queues to zero to prevent new app submission.
   * Then move all apps in the set of queues to the parent plan queue's default
   * reservation queue if move is enabled. Finally cleanups the queue by killing
   * any apps (if move is disabled or move failed) and removing the queue
   *
   * @param planQueueName the name of {@code PlanQueue}
   * @param shouldMove flag to indicate if any running apps should be moved or
   *          killed
   * @param toRemove the remnant apps to clean up
   * @param defReservationQueue the default {@code ReservationQueue} of the
   *          {@link Plan}
   */
  protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove,
      Set<String> toRemove, String defReservationQueue) {
    for (String expiredReservationId : toRemove) {
      try {
        // reduce entitlement to 0
        String expiredReservation =
            getReservationQueueName(planQueueName, expiredReservationId);
        setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
        if (shouldMove) {
          moveAppsInQueueSync(expiredReservation, defReservationQueue);
        }
        List<ApplicationAttemptId> appsInQueue = scheduler.
              getAppsInQueue(expiredReservation);
        int size = (appsInQueue == null ? 0 : appsInQueue.size());
        if (size > 0) {
          scheduler.killAllAppsInQueue(expiredReservation);
          LOG.info("Killing applications in queue: {}", expiredReservation);
        } else {
          scheduler.removeQueue(expiredReservation);
          LOG.info("Queue: " + expiredReservation + " removed");
        }
      } catch (YarnException e) {
        LOG.warn("Exception while trying to expire reservation: {}",
            expiredReservationId, e);
      }
    }
  }

  /**
   * Move all apps in the set of queues to the parent plan queue's default
   * reservation queue in a synchronous fashion
   */
  private void moveAppsInQueueSync(String expiredReservation,
      String defReservationQueue) {
    List<ApplicationAttemptId> activeApps =
        scheduler.getAppsInQueue(expiredReservation);
    if (activeApps.isEmpty()) {
      return;
    }
    for (ApplicationAttemptId app : activeApps) {
      // fallback to parent's default queue
      try {
        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
      } catch (YarnException e) {
        LOG.warn(
            "Encountered unexpected error during migration of application: {}"
                + " from reservation: {}",
            app, expiredReservation, e);
      }
    }
  }

  protected int getReservedResources(long now,
      Set<ReservationAllocation> currentReservations,
      Set<String> curReservationNames, Resource reservedResources) {
    int numRes = 0;
    if (currentReservations != null) {
      numRes = currentReservations.size();
      for (ReservationAllocation reservation : currentReservations) {
        curReservationNames.add(reservation.getReservationId().toString());
        Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
      }
    }
    return numRes;
  }

  /**
   * Sort in the order from the least new amount of resources asked (likely
   * negative) to the highest. This prevents "order-of-operation" errors related
   * to exceeding 100% capacity temporarily.
   *
   * @param currentReservations the currently active reservations
   * @param now the current time
   * @param plan the {@link Plan} that is being considered
   *
   * @return the sorted list of {@link ReservationAllocation}s
   */
  protected List<ReservationAllocation> sortByDelta(
      List<ReservationAllocation> currentReservations, long now, Plan plan) {
    Collections.sort(currentReservations,
        new ReservationAllocationComparator(now, this, plan));
    return currentReservations;
  }

  /**
   * Get queue associated with reservable queue named.
   *
   * @param planQueueName name of the reservable queue
   * @return queue associated with the reservable queue
   */
  protected abstract Queue getPlanQueue(String planQueueName);

  /**
   * Resizes reservations based on currently available resources.
   */
  private Resource calculateReservationToPlanProportion(
      ResourceCalculator rescCalculator, Resource availablePlanResources,
      Resource totalReservationResources, Resource reservationResources) {
    return Resources.multiply(availablePlanResources, Resources.ratio(
        rescCalculator, reservationResources, totalReservationResources));
  }

  /**
   * Calculates ratio of reservationResources to planResources.
   */
  private float calculateReservationToPlanRatio(
      ResourceCalculator rescCalculator, Resource clusterResources,
      Resource planResources, Resource reservationResources) {
    return Resources.divide(rescCalculator, clusterResources,
        reservationResources, planResources);
  }

  /**
   * Check if plan resources are less than expected reservation resources.
   */
  private boolean arePlanResourcesLessThanReservations(
      ResourceCalculator rescCalculator, Resource clusterResources,
      Resource planResources, Resource reservedResources) {
    return Resources.greaterThan(rescCalculator, clusterResources,
        reservedResources, planResources);
  }

  /**
   * Get a list of reservation queues for this planQueue.
   *
   * @param planQueue the queue for the current {@link Plan}
   *
   * @return the queues corresponding to the reservations
   */
  protected abstract List<? extends Queue> getChildReservationQueues(
      Queue planQueue);

  /**
   * Add a new reservation queue for reservation currResId for this planQueue.
   */
  protected abstract void addReservationQueue(String planQueueName, Queue queue,
      String currResId);

  /**
   * Creates the default reservation queue for use when no reservation is used
   * for applications submitted to this planQueue.
   *
   * @param planQueueName name of the reservable queue
   * @param queue the queue for the current {@link Plan}
   * @param defReservationQueue name of the default {@code ReservationQueue}
   */
  protected abstract void createDefaultReservationQueue(String planQueueName,
      Queue queue, String defReservationQueue);

  /**
   * Get plan resources for this planQueue.
   *
   * @param plan the current {@link Plan} being considered
   * @param clusterResources the resources available in the cluster
   *
   * @return the resources allocated to the specified {@link Plan}
   */
  protected abstract Resource getPlanResources(Plan plan, Queue queue,
      Resource clusterResources);

  /**
   * Get reservation queue resources if it exists otherwise return null.
   *
   * @param plan the current {@link Plan} being considered
   * @param reservationId the identifier of the reservation
   *
   * @return the resources allocated to the specified reservation
   */
  protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
      ReservationId reservationId);

  private static class ReservationAllocationComparator
      implements Comparator<ReservationAllocation> {
    AbstractSchedulerPlanFollower planFollower;
    long now;
    Plan plan;

    ReservationAllocationComparator(long now,
        AbstractSchedulerPlanFollower planFollower, Plan plan) {
      this.now = now;
      this.planFollower = planFollower;
      this.plan = plan;
    }

    private Resource getUnallocatedReservedResources(
        ReservationAllocation reservation) {
      Resource resResource;
      Resource reservationResource =
          planFollower.getReservationQueueResourceIfExists(plan,
              reservation.getReservationId());
      if (reservationResource != null) {
        resResource = Resources.subtract(reservation.getResourcesAtTime(now),
            reservationResource);
      } else {
        resResource = reservation.getResourcesAtTime(now);
      }
      return resResource;
    }

    @Override
    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
      // compute delta between current and previous reservation, and compare
      // based on that
      Resource lhsRes = getUnallocatedReservedResources(lhs);
      Resource rhsRes = getUnallocatedReservedResources(rhs);
      return lhsRes.compareTo(rhsRes);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractReservationSystem 源码

hadoop CapacityOverTimePolicy 源码

hadoop CapacityReservationSystem 源码

hadoop CapacitySchedulerPlanFollower 源码

hadoop FairReservationSystem 源码

hadoop FairSchedulerPlanFollower 源码

hadoop InMemoryPlan 源码

hadoop InMemoryReservationAllocation 源码

hadoop NoOverCommitPolicy 源码

hadoop PeriodicRLESparseResourceAllocation 源码

0  赞