hadoop AbstractReservationSystem 源码

  • 2022-10-20
  • 浏览 (176)

haddop AbstractReservationSystem 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.CapacityReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.FairReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This is the implementation of {@link ReservationSystem} based on the
 * {@link ResourceScheduler}
 */
@LimitedPrivate("yarn")
@Unstable
public abstract class AbstractReservationSystem extends AbstractService
    implements ReservationSystem {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractReservationSystem.class);

  // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN

  private final ReentrantReadWriteLock readWriteLock =
      new ReentrantReadWriteLock(true);
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  private boolean initialized = false;

  private final Clock clock = new UTCClock();

  private AtomicLong resCounter = new AtomicLong();

  private Map<String, Plan> plans = new HashMap<String, Plan>();

  private Map<ReservationId, String> resQMap =
      new HashMap<ReservationId, String>();

  private RMContext rmContext;

  private ResourceScheduler scheduler;

  private ScheduledExecutorService scheduledExecutorService;

  protected Configuration conf;

  protected long planStepSize;

  private PlanFollower planFollower;

  private ReservationsACLsManager reservationsACLsManager;

  private boolean isRecoveryEnabled = false;

  private long maxPeriodicity;

  /**
   * Construct the service.
   * 
   * @param name service name
   */
  public AbstractReservationSystem(String name) {
    super(name);
  }

  @Override
  public void setRMContext(RMContext rmContext) {
    writeLock.lock();
    try {
      this.rmContext = rmContext;
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void reinitialize(Configuration conf, RMContext rmContext)
      throws YarnException {
    writeLock.lock();
    try {
      if (!initialized) {
        initialize(conf);
        initialized = true;
      } else {
        initializeNewPlans(conf);
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void initialize(Configuration conf) throws YarnException {
    LOG.info("Initializing Reservation system");
    this.conf = conf;
    scheduler = rmContext.getScheduler();
    // Get the plan step size
    planStepSize = conf.getTimeDuration(
        YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
        TimeUnit.MILLISECONDS);
    if (planStepSize < 0) {
      planStepSize =
          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
    }
    maxPeriodicity =
        conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
    if (maxPeriodicity <= 0) {
      maxPeriodicity =
          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY;
    }
    // Create a plan corresponding to every reservable queue
    Set<String> planQueueNames = scheduler.getPlanQueues();
    for (String planQueueName : planQueueNames) {
      Plan plan = initializePlan(planQueueName);
      plans.put(planQueueName, plan);
    }
    isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);

    if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
        YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE)
        && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
            YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
      if (scheduler instanceof CapacityScheduler) {
        reservationsACLsManager = new CapacityReservationsACLsManager(scheduler,
            conf);
      } else if (scheduler instanceof FairScheduler) {
        reservationsACLsManager = new FairReservationsACLsManager(scheduler,
            conf);
      }
    }
  }

  private void loadPlan(String planName,
      Map<ReservationId, ReservationAllocationStateProto> reservations)
      throws PlanningException {
    Plan plan = plans.get(planName);
    Resource minAllocation = getMinAllocation();
    ResourceCalculator rescCalculator = getResourceCalculator();
    for (Entry<ReservationId, ReservationAllocationStateProto> currentReservation : reservations
        .entrySet()) {
      plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
          currentReservation.getKey(), currentReservation.getValue(),
          minAllocation, rescCalculator), true);
      resQMap.put(currentReservation.getKey(), planName);
    }
    LOG.info("Recovered reservations for Plan: {}", planName);
  }

  @Override
  public void recover(RMState state) throws Exception {
    LOG.info("Recovering Reservation system");
    writeLock.lock();
    try {
      Map<String, Map<ReservationId, ReservationAllocationStateProto>> reservationSystemState =
          state.getReservationState();
      if (planFollower != null) {
        for (String plan : plans.keySet()) {
          // recover reservations if any from state store
          if (reservationSystemState.containsKey(plan)) {
            loadPlan(plan, reservationSystemState.get(plan));
          }
          synchronizePlan(plan, false);
        }
        startPlanFollower(conf.getLong(
            YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
            YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void initializeNewPlans(Configuration conf) {
    LOG.info("Refreshing Reservation system");
    writeLock.lock();
    try {
      // Create a plan corresponding to every new reservable queue
      Set<String> planQueueNames = scheduler.getPlanQueues();
      for (String planQueueName : planQueueNames) {
        if (!plans.containsKey(planQueueName)) {
          Plan plan = initializePlan(planQueueName);
          plans.put(planQueueName, plan);
        } else {
          LOG.warn("Plan based on reservation queue {} already exists.",
              planQueueName);
        }
      }
      // Update the plan follower with the active plans
      if (planFollower != null) {
        planFollower.setPlans(plans.values());
      }
    } catch (YarnException e) {
      LOG.warn("Exception while trying to refresh reservable queues", e);
    } finally {
      writeLock.unlock();
    }
  }

  private PlanFollower createPlanFollower() {
    String planFollowerPolicyClassName =
        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
            getDefaultPlanFollower());
    if (planFollowerPolicyClassName == null) {
      return null;
    }
    LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
    try {
      Class<?> planFollowerPolicyClazz =
          conf.getClassByName(planFollowerPolicyClassName);
      if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
        return (PlanFollower) ReflectionUtils
            .newInstance(planFollowerPolicyClazz, conf);
      } else {
        throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
            + " not instance of " + PlanFollower.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException(
          "Could not instantiate PlanFollowerPolicy: "
              + planFollowerPolicyClassName,
          e);
    }
  }

  private String getDefaultPlanFollower() {
    // currently only capacity scheduler is supported
    if (scheduler instanceof CapacityScheduler) {
      return CapacitySchedulerPlanFollower.class.getName();
    } else if (scheduler instanceof FairScheduler) {
      return FairSchedulerPlanFollower.class.getName();
    }
    return null;
  }

  @Override
  public Plan getPlan(String planName) {
    readLock.lock();
    try {
      return plans.get(planName);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * @return the planStepSize
   */
  @Override
  public long getPlanFollowerTimeStep() {
    readLock.lock();
    try {
      return planStepSize;
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public void synchronizePlan(String planName, boolean shouldReplan) {
    writeLock.lock();
    try {
      Plan plan = plans.get(planName);
      if (plan != null) {
        planFollower.synchronizePlan(plan, shouldReplan);
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void startPlanFollower(long initialDelay) {
    if (planFollower != null) {
      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
      scheduledExecutorService.scheduleWithFixedDelay(planFollower,
          initialDelay, planStepSize, TimeUnit.MILLISECONDS);
    }
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    Configuration configuration = new Configuration(conf);
    reinitialize(configuration, rmContext);
    // Create the plan follower with the active plans
    planFollower = createPlanFollower();
    if (planFollower != null) {
      planFollower.init(clock, scheduler, plans.values());
    }
    super.serviceInit(conf);
  }

  @Override
  public void serviceStart() throws Exception {
    if (!isRecoveryEnabled) {
      startPlanFollower(planStepSize);
    }
    super.serviceStart();
  }

  @Override
  public void serviceStop() {
    // Stop the plan follower
    if (scheduledExecutorService != null
        && !scheduledExecutorService.isShutdown()) {
      scheduledExecutorService.shutdown();
    }
    // Clear the plans
    plans.clear();
  }

  @Override
  public String getQueueForReservation(ReservationId reservationId) {
    readLock.lock();
    try {
      return resQMap.get(reservationId);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public void setQueueForReservation(ReservationId reservationId,
      String queueName) {
    writeLock.lock();
    try {
      resQMap.put(reservationId, queueName);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public ReservationId getNewReservationId() {
    writeLock.lock();
    try {
      ReservationId resId = ReservationId.newInstance(
          ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet());
      LOG.info("Allocated new reservationId: " + resId);
      return resId;
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public Map<String, Plan> getAllPlans() {
    return plans;
  }

  /**
   * Get the default reservation system corresponding to the scheduler
   * 
   * @param scheduler the scheduler for which the reservation system is required
   *
   * @return the {@link ReservationSystem} based on the configured scheduler
   */
  public static String getDefaultReservationSystem(
      ResourceScheduler scheduler) {
    if (scheduler instanceof CapacityScheduler) {
      return CapacityReservationSystem.class.getName();
    } else if (scheduler instanceof FairScheduler) {
      return FairReservationSystem.class.getName();
    }
    return null;
  }

  protected Plan initializePlan(String planQueueName) throws YarnException {
    String planQueuePath = getPlanQueuePath(planQueueName);
    SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath);
    adPolicy.init(planQueuePath, getReservationSchedulerConfiguration());
    // Calculate the max plan capacity
    Resource minAllocation = getMinAllocation();
    Resource maxAllocation = getMaxAllocation();
    ResourceCalculator rescCalc = getResourceCalculator();
    Resource totCap = getPlanQueueCapacity(planQueueName);
    Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
        getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
        maxAllocation, planQueueName, getReplanner(planQueuePath),
        getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
        maxPeriodicity, rmContext);
    LOG.info("Initialized plan {} based on reservable queue {}",
        plan.toString(), planQueueName);
    return plan;
  }

  protected Planner getReplanner(String planQueueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String plannerClassName = reservationConfig.getReplanner(planQueueName);
    LOG.info("Using Replanner: " + plannerClassName + " for queue: "
        + planQueueName);
    try {
      Class<?> plannerClazz = conf.getClassByName(plannerClassName);
      if (Planner.class.isAssignableFrom(plannerClazz)) {
        Planner planner =
            (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
        planner.init(planQueueName, reservationConfig);
        return planner;
      } else {
        throw new YarnRuntimeException("Class: " + plannerClazz
            + " not instance of " + Planner.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate Planner: "
          + plannerClassName + " for queue: " + planQueueName, e);
    }
  }

  protected ReservationAgent getAgent(String queueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String agentClassName = reservationConfig.getReservationAgent(queueName);
    LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
    try {
      Class<?> agentClazz = conf.getClassByName(agentClassName);
      if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
        ReservationAgent resevertionAgent =
            (ReservationAgent) agentClazz.newInstance();
        resevertionAgent.init(conf);
        return resevertionAgent;
      } else {
        throw new YarnRuntimeException("Class: " + agentClassName
            + " not instance of " + ReservationAgent.class.getCanonicalName());
      }
    } catch (ClassNotFoundException | InstantiationException
        | IllegalAccessException e) {
      throw new YarnRuntimeException("Could not instantiate Agent: "
          + agentClassName + " for queue: " + queueName, e);
    }
  }

  protected SharingPolicy getAdmissionPolicy(String queueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String admissionPolicyClassName =
        reservationConfig.getReservationAdmissionPolicy(queueName);
    LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
        + " for queue: " + queueName);
    try {
      Class<?> admissionPolicyClazz =
          conf.getClassByName(admissionPolicyClassName);
      if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
        return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz,
            conf);
      } else {
        throw new YarnRuntimeException("Class: " + admissionPolicyClassName
            + " not instance of " + SharingPolicy.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
          + admissionPolicyClassName + " for queue: " + queueName, e);
    }
  }

  public ReservationsACLsManager getReservationsACLsManager() {
    return this.reservationsACLsManager;
  }

  protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();

  protected abstract String getPlanQueuePath(String planQueueName);

  protected abstract Resource getPlanQueueCapacity(String planQueueName);

  protected abstract Resource getMinAllocation();

  protected abstract Resource getMaxAllocation();

  protected abstract ResourceCalculator getResourceCalculator();

  protected abstract QueueMetrics getRootQueueMetrics();
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractSchedulerPlanFollower 源码

hadoop CapacityOverTimePolicy 源码

hadoop CapacityReservationSystem 源码

hadoop CapacitySchedulerPlanFollower 源码

hadoop FairReservationSystem 源码

hadoop FairSchedulerPlanFollower 源码

hadoop InMemoryPlan 源码

hadoop InMemoryReservationAllocation 源码

hadoop NoOverCommitPolicy 源码

hadoop PeriodicRLESparseResourceAllocation 源码

0  赞