hadoop FSAppAttempt 源码

  • 2022-10-20
  • 浏览 (208)

haddop FSAppAttempt 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Represents an application attempt from the viewpoint of the Fair Scheduler.
 */
@Private
@Unstable
public class FSAppAttempt extends SchedulerApplicationAttempt
    implements Schedulable {

  private static final Logger LOG =
      LoggerFactory.getLogger(FSAppAttempt.class);
  private static final DefaultResourceCalculator RESOURCE_CALCULATOR
      = new DefaultResourceCalculator();

  private final long startTime;
  private final Priority appPriority;
  private Resource demand = Resources.createResource(0);
  private final FairScheduler scheduler;
  private Resource fairShare = Resources.createResource(0, 0);

  // Preemption related variables
  private final Object preemptionVariablesLock = new Object();
  private final Set<RMContainer> containersToBePreempted = new HashSet<>();
  private final Resource resourcesToBePreempted =
      Resources.clone(Resources.none());

  private Resource fairshareStarvation = Resources.none();
  private long lastTimeAtFairShare;
  private long nextStarvationCheck;

  // minShareStarvation attributed to this application by the leaf queue
  private Resource minshareStarvation = Resources.none();

  // Used to record node reservation by an app.
  // Key = RackName, Value = Set of Nodes reserved by app on rack
  private final Map<String, Set<String>> reservations = new HashMap<>();

  private final List<FSSchedulerNode> blacklistNodeIds = new ArrayList<>();

  private boolean enableAMPreemption;

  /**
   * Delay scheduling: We often want to prioritize scheduling of node-local
   * containers over rack-local or off-switch containers. To achieve this
   * we first only allow node-local assignments for a given priority level,
   * then relax the locality threshold once we've had a long enough period
   * without successfully scheduling. We measure both the number of "missed"
   * scheduling opportunities since the last container was scheduled
   * at the current allowed level and the time since the last container
   * was scheduled. Currently we use only the former.
   */
  private final Map<SchedulerRequestKey, NodeType> allowedLocalityLevel =
      new HashMap<>();

  public FSAppAttempt(FairScheduler scheduler,
      ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
      ActiveUsersManager activeUsersManager, RMContext rmContext) {
    super(applicationAttemptId, user, queue, activeUsersManager, rmContext);

    this.scheduler = scheduler;
    this.startTime = scheduler.getClock().getTime();
    this.lastTimeAtFairShare = this.startTime;
    this.appPriority = Priority.newInstance(1);
    this.enableAMPreemption = scheduler.getConf()
            .getAMPreemptionEnabled(getQueue().getQueueName());
  }

  /**
   * Get metrics reference from containing queue.
   */
  public QueueMetrics getMetrics() {
    return queue.getMetrics();
  }

  void containerCompleted(RMContainer rmContainer,
      ContainerStatus containerStatus, RMContainerEventType event) {
    writeLock.lock();
    try {
      Container container = rmContainer.getContainer();
      ContainerId containerId = container.getId();

      // Remove from the list of containers
      if (liveContainers.remove(containerId) == null) {
        LOG.info("Additional complete request on completed container " +
            rmContainer.getContainerId());
        return;
      }

      // Remove from the list of newly allocated containers if found
      newlyAllocatedContainers.remove(rmContainer);

      // Inform the container
      rmContainer.handle(
          new RMContainerFinishedEvent(containerId, containerStatus, event));
      LOG.debug("Completed container: {} in state: {} event:{}",
          rmContainer.getContainerId(), rmContainer.getState(), event);


      untrackContainerForPreemption(rmContainer);
      if (containerStatus.getDiagnostics().
          equals(SchedulerUtils.PREEMPTED_CONTAINER)) {
        queue.getMetrics().preemptContainer();
      }

      Resource containerResource = rmContainer.getContainer().getResource();
      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
          "SchedulerApp", getApplicationId(), containerId, containerResource,
          rmContainer.getQueueName(), null);

      // Update usage metrics
      queue.getMetrics().releaseResources(
          rmContainer.getNodeLabelExpression(),
          getUser(), 1, containerResource);
      this.attemptResourceUsage.decUsed(containerResource);
      getQueue().decUsedResource(containerResource);

      // Clear resource utilization metrics cache.
      lastMemoryAggregateAllocationUpdateTime = -1;
    } finally {
      writeLock.unlock();
    }
  }

  private void unreserveInternal(
      SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
    writeLock.lock();
    try {
      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
          schedulerKey);
      RMContainer reservedContainer = reservedContainers.remove(
          node.getNodeID());
      if (reservedContainers.isEmpty()) {
        this.reservedContainers.remove(schedulerKey);
      }

      // Reset the re-reservation count
      resetReReservations(schedulerKey);

      Resource resource = reservedContainer.getContainer().getResource();
      this.attemptResourceUsage.decReserved(resource);

      LOG.info(
          "Application " + getApplicationId() + " unreserved " + " on node "
              + node + ", currently has " + reservedContainers.size()
              + " at priority " + schedulerKey.getPriority()
              + "; currentReservation " + this.attemptResourceUsage
              .getReserved());
    } finally {
      writeLock.unlock();
    }
  }

  private void subtractResourcesOnBlacklistedNodes(
      Resource availableResources) {
    if (appSchedulingInfo.getAndResetBlacklistChanged()) {
      blacklistNodeIds.clear();
      blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
    }
    for (FSSchedulerNode node: blacklistNodeIds) {
      Resources.subtractFromNonNegative(availableResources,
          node.getUnallocatedResource());
    }
  }

  /**
   * Headroom depends on resources in the cluster, current usage of the
   * queue, queue's fair-share and queue's max-resources.
   */
  @Override
  public Resource getHeadroom() {
    final FSQueue fsQueue = getQueue();
    SchedulingPolicy policy = fsQueue.getPolicy();

    Resource queueFairShare = fsQueue.getFairShare();
    Resource queueUsage = fsQueue.getResourceUsage();
    Resource clusterResource = this.scheduler.getClusterResource();
    Resource clusterUsage = this.scheduler.getRootQueueMetrics()
        .getAllocatedResources();

    Resource clusterAvailableResources =
        Resources.subtract(clusterResource, clusterUsage);
    subtractResourcesOnBlacklistedNodes(clusterAvailableResources);

    Resource queueMaxAvailableResources =
        Resources.subtract(fsQueue.getMaxShare(), queueUsage);
    Resource maxAvailableResource = Resources.componentwiseMin(
        clusterAvailableResources, queueMaxAvailableResources);

    Resource headroom = policy.getHeadroom(queueFairShare,
        queueUsage, maxAvailableResource);
    LOG.debug("Headroom calculation for {}:Min((queueFairShare={} -"
        + " queueUsage={}), maxAvailableResource={} Headroom={}",
        this.getName(), queueFairShare, queueUsage, maxAvailableResource,
        headroom);

    return headroom;
  }

  /**
   * Return the level at which we are allowed to schedule containers, given the
   * current size of the cluster and thresholds indicating how many nodes to
   * fail at (as a fraction of cluster size) before relaxing scheduling
   * constraints.
   * @param schedulerKey SchedulerRequestKey
   * @param numNodes Num Nodes
   * @param nodeLocalityThreshold nodeLocalityThreshold
   * @param rackLocalityThreshold rackLocalityThreshold
   * @return NodeType
   */
  NodeType getAllowedLocalityLevel(
      SchedulerRequestKey schedulerKey, int numNodes,
      double nodeLocalityThreshold, double rackLocalityThreshold) {
    // upper limit on threshold
    if (nodeLocalityThreshold > 1.0) {
      nodeLocalityThreshold = 1.0;
    }
    if (rackLocalityThreshold > 1.0) {
      rackLocalityThreshold = 1.0;
    }

    // If delay scheduling is not being used, can schedule anywhere
    if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
      return NodeType.OFF_SWITCH;
    }

    writeLock.lock();
    try {

      // Default level is NODE_LOCAL
      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
        return NodeType.NODE_LOCAL;
      }

      NodeType allowed = allowedLocalityLevel.get(schedulerKey);

      // If level is already most liberal, we're done
      if (allowed.equals(NodeType.OFF_SWITCH)) {
        return NodeType.OFF_SWITCH;
      }

      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
          nodeLocalityThreshold :
          rackLocalityThreshold;

      // Relax locality constraints once we've surpassed threshold.
      int schedulingOpportunities = getSchedulingOpportunities(schedulerKey);
      double thresholdNum = numNodes * threshold;
      if (schedulingOpportunities > thresholdNum) {
        if (allowed.equals(NodeType.NODE_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", nodeLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
          resetSchedulingOpportunities(schedulerKey);
        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", rackLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
          resetSchedulingOpportunities(schedulerKey);
        }
      }
      return allowedLocalityLevel.get(schedulerKey);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Return the level at which we are allowed to schedule containers.
   * Given the thresholds indicating how much time passed before relaxing
   * scheduling constraints.
   * @param schedulerKey SchedulerRequestKey
   * @param nodeLocalityDelayMs nodeLocalityThreshold
   * @param rackLocalityDelayMs nodeLocalityDelayMs
   * @param currentTimeMs currentTimeMs
   * @return NodeType
   */
  NodeType getAllowedLocalityLevelByTime(
      SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
      long rackLocalityDelayMs, long currentTimeMs) {
    // if not being used, can schedule anywhere
    if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
      return NodeType.OFF_SWITCH;
    }

    writeLock.lock();
    try {

      // default level is NODE_LOCAL
      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
        // add the initial time of priority to prevent comparing with FsApp
        // startTime and allowedLocalityLevel degrade
        lastScheduledContainer.put(schedulerKey, currentTimeMs);
        LOG.debug("Init the lastScheduledContainer time, priority: {},"
            + " time: {}", schedulerKey.getPriority(), currentTimeMs);
        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
        return NodeType.NODE_LOCAL;
      }

      NodeType allowed = allowedLocalityLevel.get(schedulerKey);

      // if level is already most liberal, we're done
      if (allowed.equals(NodeType.OFF_SWITCH)) {
        return NodeType.OFF_SWITCH;
      }

      // check waiting time
      long waitTime = currentTimeMs;
      if (lastScheduledContainer.containsKey(schedulerKey)) {
        waitTime -= lastScheduledContainer.get(schedulerKey);
      } else{
        waitTime -= getStartTime();
      }

      long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
          nodeLocalityDelayMs :
          rackLocalityDelayMs;

      if (waitTime > thresholdTime) {
        if (allowed.equals(NodeType.NODE_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Waiting time: " + waitTime
                + " ms, nodeLocalityDelay time: " + nodeLocalityDelayMs + " ms"
                + ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Waiting time: " + waitTime
                + " ms, nodeLocalityDelay time: " + nodeLocalityDelayMs + " ms"
                + ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
        }
      }
      return allowedLocalityLevel.get(schedulerKey);
    } finally {
      writeLock.unlock();
    }
  }

  public RMContainer allocate(NodeType type, FSSchedulerNode node,
      SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
      Container reservedContainer) {
    RMContainer rmContainer;
    Container container;

    writeLock.lock();
    try {
      // Update allowed locality level
      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
      if (allowed != null) {
        if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
            NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
          this.resetAllowedLocalityLevel(schedulerKey, type);
        } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
            NodeType.NODE_LOCAL)) {
          this.resetAllowedLocalityLevel(schedulerKey, type);
        }
      }

      // Required sanity check - AM can call 'allocate' to update resource
      // request without locking the scheduler, hence we need to check
      if (getOutstandingAsksCount(schedulerKey) <= 0) {
        return null;
      }

      container = reservedContainer;
      if (container == null) {
        container = createContainer(node, pendingAsk.getPerAllocationResource(),
            schedulerKey);
      }

      // Create RMContainer
      rmContainer = new RMContainerImpl(container, schedulerKey,
          getApplicationAttemptId(), node.getNodeID(),
          appSchedulingInfo.getUser(), rmContext);
      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());

      // Add it to allContainers list.
      addToNewlyAllocatedContainers(node, rmContainer);
      liveContainers.put(container.getId(), rmContainer);
      // Update consumption and track allocations
      ContainerRequest containerRequest = appSchedulingInfo.allocate(
            type, node, schedulerKey, rmContainer);
      this.attemptResourceUsage.incUsed(container.getResource());
      getQueue().incUsedResource(container.getResource());

      // Update resource requests related to "request" and store in RMContainer
      ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);

      // Inform the container
      rmContainer.handle(
          new RMContainerEvent(container.getId(), RMContainerEventType.START));

      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate: applicationAttemptId=" + container.getId()
            .getApplicationAttemptId() + " container=" + container.getId()
            + " host=" + container.getNodeId().getHost() + " type=" + type);
      }
      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
          "SchedulerApp", getApplicationId(), container.getId(),
          container.getResource(), getQueueName(), null);
    } finally {
      writeLock.unlock();
    }

    return rmContainer;
  }

  /**
   * Should be called when the scheduler assigns a container at a higher
   * degree of locality than the current threshold. Reset the allowed locality
   * level to a higher degree of locality.
   * @param schedulerKey Scheduler Key
   * @param level NodeType
   */
  void resetAllowedLocalityLevel(
      SchedulerRequestKey schedulerKey, NodeType level) {
    NodeType old;
    writeLock.lock();
    try {
      old = allowedLocalityLevel.put(schedulerKey, level);
    } finally {
      writeLock.unlock();
    }

    LOG.info("Raising locality level from " + old + " to " + level + " at "
        + " priority " + schedulerKey.getPriority());
  }

  @Override
  public FSLeafQueue getQueue() {
    return (FSLeafQueue) queue;
  }

  // Preemption related methods

  /**
   * Get overall starvation - fairshare and attributed minshare.
   *
   * @return total starvation attributed to this application
   */
  Resource getStarvation() {
    return Resources.add(fairshareStarvation, minshareStarvation);
  }

  /**
   * Get last computed fairshare starvation.
   *
   * @return last computed fairshare starvation
   */
  Resource getFairshareStarvation() {
    return fairshareStarvation;
  }

  /**
   * Set the minshare attributed to this application. To be called only from
   * {@link FSLeafQueue#updateStarvedApps}.
   *
   * @param starvation minshare starvation attributed to this app
   */
  void setMinshareStarvation(Resource starvation) {
    this.minshareStarvation = starvation;
  }

  /**
   * Reset the minshare starvation attributed to this application. To be
   * called only from {@link FSLeafQueue#updateStarvedApps}
   */
  void resetMinshareStarvation() {
    this.minshareStarvation = Resources.none();
  }

  /**
   * Get last computed minshare starvation.
   *
   * @return last computed minshare starvation
   */
  Resource getMinshareStarvation() {
    return minshareStarvation;
  }

  void trackContainerForPreemption(RMContainer container) {
    synchronized (preemptionVariablesLock) {
      if (containersToBePreempted.add(container)) {
        Resources.addTo(resourcesToBePreempted,
            container.getAllocatedResource());
      }
    }
  }

  private void untrackContainerForPreemption(RMContainer container) {
    synchronized (preemptionVariablesLock) {
      if (containersToBePreempted.remove(container)) {
        Resources.subtractFrom(resourcesToBePreempted,
            container.getAllocatedResource());
      }
    }
  }

  Set<ContainerId> getPreemptionContainerIds() {
    synchronized (preemptionVariablesLock) {
      Set<ContainerId> preemptionContainerIds = new HashSet<>();
      for (RMContainer container : containersToBePreempted) {
        preemptionContainerIds.add(container.getContainerId());
      }
      return preemptionContainerIds;
    }
  }

  boolean canContainerBePreempted(RMContainer container,
                                  Resource alreadyConsideringForPreemption) {
    if (!isPreemptable()) {
      return false;
    }

    if (container.isAMContainer() && !enableAMPreemption) {
      return false;
    }

    // Sanity check that the app owns this container
    if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
        !newlyAllocatedContainers.contains(container)) {
      LOG.error("Looking to preempt container " + container +
          ". Container does not belong to app " + getApplicationId());
      return false;
    }

    synchronized (preemptionVariablesLock) {
      if (containersToBePreempted.contains(container)) {
        // The container is already under consideration for preemption
        return false;
      }
    }

    // Check if the app's allocation will be over its fairshare even
    // after preempting this container
    Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
            container.getAllocatedResource(),
            alreadyConsideringForPreemption);

    return !isUsageBelowShare(usageAfterPreemption, getFairShare());
  }

  private Resource getUsageAfterPreemptingContainer(Resource containerResources,
          Resource alreadyConsideringForPreemption) {
    Resource usageAfterPreemption = Resources.clone(getResourceUsage());

    // Subtract resources of containers already queued for preemption
    synchronized (preemptionVariablesLock) {
      Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
    }

    // Subtract resources of this container and other containers of this app
    // that the FSPreemptionThread is already considering for preemption.
    Resources.subtractFrom(usageAfterPreemption, containerResources);
    Resources.subtractFrom(usageAfterPreemption,
            alreadyConsideringForPreemption);

    return usageAfterPreemption;
  }

  /**
   * Create and return a container object reflecting an allocation for the
   * given application on the given node with the given capability and
   * priority.
   *
   * @param node Node
   * @param capability Capability
   * @param schedulerKey Scheduler Key
   * @return Container
   */
  private Container createContainer(FSSchedulerNode node, Resource capability,
      SchedulerRequestKey schedulerKey) {

    NodeId nodeId = node.getRMNode().getNodeID();
    ContainerId containerId = BuilderUtils.newContainerId(
        getApplicationAttemptId(), getNewContainerId());

    // Create the container
    return BuilderUtils.newContainer(containerId, nodeId,
        node.getRMNode().getHttpAddress(), capability,
        schedulerKey.getPriority(), null,
        schedulerKey.getAllocationRequestId());
  }

  @Override
  public synchronized boolean recoverContainer(SchedulerNode node,
      RMContainer rmContainer) {
    writeLock.lock();
    try {
      final boolean recovered = super.recoverContainer(node, rmContainer);

      if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
        getQueue().incUsedResource(rmContainer.getContainer().getResource());
      }

      // If not running unmanaged, the first container we recover is always
      // the AM. Set the amResource for this app and update the leaf queue's AM
      // usage
      if (!isAmRunning() && !getUnmanagedAM()) {
        Resource resource = rmContainer.getAllocatedResource();
        setAMResource(resource);
        getQueue().addAMResourceUsage(resource);
        setAmRunning(true);
      }

      return recovered;
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Reserve a spot for {@code container} on this {@code node}. If
   * the container is {@code alreadyReserved} on the node, simply
   * update relevant bookkeeping. This dispatches ro relevant handlers
   * in {@link FSSchedulerNode}..
   * return whether reservation was possible with the current threshold limits
   */
  private boolean reserve(Resource perAllocationResource, FSSchedulerNode node,
      Container reservedContainer, NodeType type,
      SchedulerRequestKey schedulerKey) {

    RMContainer nodeReservedContainer = node.getReservedContainer();
    boolean reservableForThisApp = nodeReservedContainer == null ||
        nodeReservedContainer.getApplicationAttemptId()
            .equals(getApplicationAttemptId());
    if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) {
      LOG.info("Making reservation: node=" + node.getNodeName() +
              " app_id=" + getApplicationId());
      if (reservedContainer == null) {
        reservedContainer =
            createContainer(node, perAllocationResource,
              schedulerKey);
        getMetrics().reserveResource(node.getPartition(), getUser(),
            reservedContainer.getResource());
        RMContainer rmContainer =
                super.reserve(node, schedulerKey, null, reservedContainer);
        node.reserveResource(this, schedulerKey, rmContainer);
        setReservation(node);
      } else {
        RMContainer rmContainer = node.getReservedContainer();
        super.reserve(node, schedulerKey, rmContainer, reservedContainer);
        node.reserveResource(this, schedulerKey, rmContainer);
        setReservation(node);
      }
      return true;
    }
    return false;
  }

  private boolean reservationExceedsThreshold(FSSchedulerNode node,
                                                 NodeType type) {
    // Only if not node-local
    if (type != NodeType.NODE_LOCAL) {
      int existingReservations = getNumReservations(node.getRackName(),
              type == NodeType.OFF_SWITCH);
      int totalAvailNodes =
              (type == NodeType.OFF_SWITCH) ? scheduler.getNumClusterNodes() :
                      scheduler.getNumNodesInRack(node.getRackName());
      int numAllowedReservations =
              (int)Math.ceil(
                      totalAvailNodes * scheduler.getReservableNodesRatio());
      if (existingReservations >= numAllowedReservations) {
        DecimalFormat df = new DecimalFormat();
        df.setMaximumFractionDigits(2);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Reservation Exceeds Allowed number of nodes:" +
                  " app_id=" + getApplicationId() +
                  " existingReservations=" + existingReservations +
                  " totalAvailableNodes=" + totalAvailNodes +
                  " reservableNodesRatio=" + df.format(
                                          scheduler.getReservableNodesRatio()) +
                  " numAllowedReservations=" + numAllowedReservations);
        }
        return true;
      }
    }
    return false;
  }

  /**
   * Remove the reservation on {@code node} at the given SchedulerRequestKey.
   * This dispatches SchedulerNode handlers as well.
   * @param schedulerKey Scheduler Key
   * @param node Node
   */
  public void unreserve(SchedulerRequestKey schedulerKey,
      FSSchedulerNode node) {
    RMContainer rmContainer = node.getReservedContainer();
    unreserveInternal(schedulerKey, node);
    node.unreserveResource(this);
    clearReservation(node);
    getMetrics().unreserveResource(node.getPartition(),
        getUser(), rmContainer.getContainer().getResource());
  }

  private void setReservation(SchedulerNode node) {
    String rackName =
        node.getRackName() == null ? "NULL" : node.getRackName();

    writeLock.lock();
    try {
      Set<String> rackReservations = reservations.get(rackName);
      if (rackReservations == null) {
        rackReservations = new HashSet<>();
        reservations.put(rackName, rackReservations);
      }
      rackReservations.add(node.getNodeName());
    } finally {
      writeLock.unlock();
    }
  }

  private void clearReservation(SchedulerNode node) {
    String rackName =
        node.getRackName() == null ? "NULL" : node.getRackName();

    writeLock.lock();
    try {
      Set<String> rackReservations = reservations.get(rackName);
      if (rackReservations != null) {
        rackReservations.remove(node.getNodeName());
      }
    } finally {
      writeLock.unlock();
    }
  }

  int getNumReservations(String rackName, boolean isAny) {
    int counter = 0;
    if (isAny) {
      for (Set<String> nodes : reservations.values()) {
        if (nodes != null) {
          counter += nodes.size();
        }
      }
    } else {
      Set<String> nodes = reservations.get(
              rackName == null ? "NULL" : rackName);
      if (nodes != null) {
        counter += nodes.size();
      }
    }
    return counter;
  }

  /**
   * Assign a container to this node to facilitate {@code request}. If node does
   * not have enough memory, create a reservation. This is called once we are
   * sure the particular request should be facilitated by this node.
   *
   * @param node
   *     The node to try placing the container on.
   * @param pendingAsk
   *     The {@link PendingAsk} we're trying to satisfy.
   * @param type
   *     The locality of the assignment.
   * @param reserved
   *     Whether there's already a container reserved for this app on the node.
   * @return
   *     If an assignment was made, returns the resources allocated to the
   *     container.  If a reservation was made, returns
   *     FairScheduler.CONTAINER_RESERVED.  If no assignment or reservation was
   *     made, returns an empty resource.
   */
  private Resource assignContainer(
      FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
      boolean reserved, SchedulerRequestKey schedulerKey) {

    // How much does this request need?
    Resource capability = pendingAsk.getPerAllocationResource();

    // How much does the node have?
    Resource available = node.getUnallocatedResource();

    Container reservedContainer = null;
    if (reserved) {
      reservedContainer = node.getReservedContainer().getContainer();
    }

    // Can we allocate a container on this node?
    if (Resources.fitsIn(capability, available)) {
      // Inform the application of the new container for this request
      RMContainer allocatedContainer =
          allocate(type, node, schedulerKey, pendingAsk,
              reservedContainer);
      if (allocatedContainer == null) {
        // Did the application need this resource?
        if (reserved) {
          unreserve(schedulerKey, node);
        }
        LOG.debug("Resource ask {} fits in available node resources {},"
            + " but no container was allocated", capability, available);
        return Resources.none();
      }

      // If we had previously made a reservation, delete it
      if (reserved) {
        unreserve(schedulerKey, node);
      }

      // Inform the node
      node.allocateContainer(allocatedContainer);

      // If not running unmanaged, the first container we allocate is always
      // the AM. Set the amResource for this app and update the leaf queue's AM
      // usage
      if (!isAmRunning() && !getUnmanagedAM()) {
        setAMResource(capability);
        getQueue().addAMResourceUsage(capability);
        setAmRunning(true);
      }

      return capability;
    }

    LOG.debug("Resource request: {} exceeds the available"
          + " resources of the node.", capability);

    // The desired container won't fit here, so reserve
    // Reserve only, if app does not wait for preempted resources on the node,
    // otherwise we may end up with duplicate reservations
    if (isReservable(capability) &&
        !node.isPreemptedForApp(this) &&
        reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
            type, schedulerKey)) {
      updateAMDiagnosticMsg(capability, " exceeds the available resources of "
          + "the node and the request is reserved)");
      LOG.debug("{}'s resource request is reserved.", getName());
      return FairScheduler.CONTAINER_RESERVED;
    } else {
      updateAMDiagnosticMsg(capability, " exceeds the available resources of "
          + "the node and the request cannot be reserved)");
      if (LOG.isDebugEnabled()) {
        LOG.debug("Couldn't create reservation for app:  " + getName()
            + ", at priority " +  schedulerKey.getPriority());
      }
      return Resources.none();
    }
  }

  private boolean isReservable(Resource capacity) {
    // Reserve only when the app is starved and the requested container size
    // is larger than the configured threshold
    return isStarved() &&
        scheduler.isAtLeastReservationThreshold(
            getQueue().getPolicy().getResourceCalculator(), capacity);
  }

  /**
   * Whether the AM container for this app is over maxAMShare limit.
   */
  private boolean isOverAMShareLimit() {
    // Check the AM resource usage for the leaf queue
    if (!isAmRunning() && !getUnmanagedAM()) {
      // Return true if we have not ask, or queue is not be able to run app's AM
      PendingAsk ask = appSchedulingInfo.getNextPendingAsk();
      if (ask != null && (ask.getCount() == 0 || !getQueue().canRunAppAM(
          ask.getPerAllocationResource()))) {
        return true;
      }
    }
    return false;
  }

  @SuppressWarnings("deprecation")
  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
    }

    Collection<SchedulerRequestKey> keysToTry = (reserved) ?
        Collections.singletonList(
            node.getReservedContainer().getReservedSchedulerKey()) :
        getSchedulerKeys();

    // For each priority, see if we can schedule a node local, rack local
    // or off-switch request. Rack of off-switch requests may be delayed
    // (not scheduled) in order to promote better locality.
    writeLock.lock();
    try {

      // TODO (wandga): All logics in this method should be added to
      // SchedulerPlacement#canDelayTo which is independent from scheduler.
      // Scheduler can choose to use various/pluggable delay-scheduling
      // implementation.
      for (SchedulerRequestKey schedulerKey : keysToTry) {
        // Skip it for reserved container, since
        // we already check it in isValidReservation.
        if (!reserved && !hasContainerForNode(schedulerKey, node)) {
          continue;
        }

        addSchedulingOpportunity(schedulerKey);

        PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getRackName());
        PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getNodeName());

        if (nodeLocalPendingAsk.getCount() > 0
            && !appSchedulingInfo.canDelayTo(schedulerKey,
            node.getNodeName())) {
          LOG.warn("Relax locality off is not supported on local request: "
              + nodeLocalPendingAsk);
        }

        NodeType allowedLocality;
        if (scheduler.isContinuousSchedulingEnabled()) {
          allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
              scheduler.getNodeLocalityDelayMs(),
              scheduler.getRackLocalityDelayMs(),
              scheduler.getClock().getTime());
        } else {
          allowedLocality = getAllowedLocalityLevel(schedulerKey,
              scheduler.getNumClusterNodes(),
              scheduler.getNodeLocalityThreshold(),
              scheduler.getRackLocalityThreshold());
        }

        if (rackLocalPendingAsk.getCount() > 0
            && nodeLocalPendingAsk.getCount() > 0) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: NODE_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
              reserved, schedulerKey);
        }

        if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
          continue;
        }

        if (rackLocalPendingAsk.getCount() > 0
            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
            .equals(NodeType.OFF_SWITCH))) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: RACK_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
              reserved, schedulerKey);
        }

        PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
            ResourceRequest.ANY);
        if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
          continue;
        }

        if (offswitchAsk.getCount() > 0) {
          if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
              <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("Assign container on " + node.getNodeName()
                  + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
                  + allowedLocality + ", priority: "
                  + schedulerKey.getPriority()
                  + ", app attempt id: " + this.attemptId);
            }
            return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
                reserved, schedulerKey);
          }
        }

        if (LOG.isTraceEnabled()) {
          LOG.trace("Can't assign container on " + node.getNodeName()
              + " node, allowedLocality: " + allowedLocality + ", priority: "
              + schedulerKey.getPriority() + ", app attempt id: "
              + this.attemptId);
        }
      }
    } finally {
      writeLock.unlock();
    }

    return Resources.none();
  }

  /**
   * Whether this app has containers requests that could be satisfied on the
   * given node, if the node had full space.
   */
  private boolean hasContainerForNode(SchedulerRequestKey key,
      FSSchedulerNode node) {
    PendingAsk offswitchAsk = getPendingAsk(key, ResourceRequest.ANY);
    Resource resource = offswitchAsk.getPerAllocationResource();
    boolean hasRequestForOffswitch =
        offswitchAsk.getCount() > 0;
    boolean hasRequestForRack = getOutstandingAsksCount(key,
        node.getRackName()) > 0;
    boolean hasRequestForNode = getOutstandingAsksCount(key,
        node.getNodeName()) > 0;

    boolean ret = true;
    if (!(// There must be outstanding requests at the given priority:
        hasRequestForOffswitch &&
            // If locality relaxation is turned off at *-level, there must be a
            // non-zero request for the node's rack:
            (appSchedulingInfo.canDelayTo(key, ResourceRequest.ANY) ||
                (hasRequestForRack)) &&
            // If locality relaxation is turned off at rack-level,
            // there must be a non-zero request at the node:
            (!hasRequestForRack || appSchedulingInfo.canDelayTo(key,
                node.getRackName()) || (hasRequestForNode)) &&
            // The requested container must be able to fit on the node:
            Resources.fitsIn(resource,
                node.getRMNode().getTotalCapability()))) {
      ret = false;
    } else if (!getQueue().fitsInMaxShare(resource)) {
      // The requested container must fit in queue maximum share
      updateAMDiagnosticMsg(resource,
          " exceeds current queue or its parents maximum resource allowed). " +
                  "Max share of queue: " + getQueue().getMaxShare());

      ret = false;
    }

    return ret;
  }

  private boolean isValidReservation(FSSchedulerNode node) {
    SchedulerRequestKey schedulerKey = node.getReservedContainer().
        getReservedSchedulerKey();
    return hasContainerForNode(schedulerKey, node) &&
        !isOverAMShareLimit();
  }

  /**
   * Called when this application already has an existing reservation on the
   * given node.  Sees whether we can turn the reservation into an allocation.
   * Also checks whether the application needs the reservation anymore, and
   * releases it if not.
   *
   * @param node
   *     Node that the application has an existing reservation on
   * @return whether the reservation on the given node is valid.
   */
  boolean assignReservedContainer(FSSchedulerNode node) {
    RMContainer rmContainer = node.getReservedContainer();
    SchedulerRequestKey reservedSchedulerKey =
        rmContainer.getReservedSchedulerKey();

    if (!isValidReservation(node)) {
      // Don't hold the reservation if app can no longer use it
      LOG.info("Releasing reservation that cannot be satisfied for " +
          "application " + getApplicationAttemptId() + " on node " + node);
      unreserve(reservedSchedulerKey, node);
      return false;
    }

    // Reservation valid; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + getApplicationAttemptId() + " on node: " + node);
    }

    // Fail early if the reserved container won't fit.
    // Note that we have an assumption here that
    // there's only one container size per priority.
    if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
        node.getUnallocatedResource())) {
      assignContainer(node, true);
    }
    return true;
  }

  /**
   * Helper method that computes the extent of fairshare starvation.
   * @return freshly computed fairshare starvation
   */
  Resource fairShareStarvation() {
    long now = scheduler.getClock().getTime();
    Resource threshold = Resources.multiply(
        getFairShare(), getQueue().getFairSharePreemptionThreshold());
    Resource fairDemand = Resources.componentwiseMin(threshold, demand);

    // Check if the queue is starved for fairshare
    boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);

    if (!starved) {
      lastTimeAtFairShare = now;
    }

    if (!starved ||
        now - lastTimeAtFairShare <
            getQueue().getFairSharePreemptionTimeout()) {
      fairshareStarvation = Resources.none();
    } else {
      // The app has been starved for longer than preemption-timeout.
      fairshareStarvation =
          Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
    }
    return fairshareStarvation;
  }

  /**
   * Helper method that checks if {@code usage} is strictly less than
   * {@code share}.
   */
  private boolean isUsageBelowShare(Resource usage, Resource share) {
    return getQueue().getPolicy().getResourceCalculator().compare(
        scheduler.getClusterResource(), usage, share, true) < 0;
  }

  /**
   * Helper method that captures if this app is identified to be starved.
   * @return true if the app is starved for fairshare, false otherwise
   */
  boolean isStarvedForFairShare() {
    return isUsageBelowShare(getResourceUsage(), getFairShare());
  }

  /**
   * Is application starved for fairshare or minshare.
   */
  boolean isStarved() {
    return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
  }

  /**
   * Fetch a list of RRs corresponding to the extent the app is starved
   * (fairshare and minshare). This method considers the number of containers
   * in a RR and also only one locality-level (the first encountered
   * resourceName).
   *
   * @return list of {@link ResourceRequest}s corresponding to the amount of
   * starvation.
   */
  List<ResourceRequest> getStarvedResourceRequests() {
    // List of RRs we build in this method to return
    List<ResourceRequest> ret = new ArrayList<>();

    // Track visited RRs to avoid the same RR at multiple locality levels
    VisitedResourceRequestTracker visitedRRs =
        new VisitedResourceRequestTracker(scheduler.getNodeTracker());

    // Start with current starvation and track the pending amount
    Resource pending = getStarvation();
    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
      if (Resources.isNone(pending)) {
        // Found enough RRs to match the starvation
        break;
      }

      // See if we have already seen this RR
      if (!visitedRRs.visit(rr)) {
        continue;
      }

      // A RR can have multiple containers of a capability. We need to
      // compute the number of containers that fit in "pending".
      int numContainersThatFit = (int) Math.floor(
          Resources.ratio(scheduler.getResourceCalculator(),
              pending, rr.getCapability()));
      if (numContainersThatFit == 0) {
        // This RR's capability is too large to fit in pending
        continue;
      }

      // If the RR is only partially being satisfied, include only the
      // partial number of containers.
      if (numContainersThatFit < rr.getNumContainers()) {
        rr = ResourceRequest.newInstance(rr.getPriority(),
            rr.getResourceName(), rr.getCapability(), numContainersThatFit);
      }

      // Add the RR to return list and adjust "pending" accordingly
      ret.add(rr);
      Resources.subtractFromNonNegative(pending,
          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
    }

    return ret;
  }

  /**
   * Notify this app that preemption has been triggered to make room for
   * outstanding demand. The app should not be considered starved until after
   * the specified delay.
   *
   * @param delayBeforeNextStarvationCheck duration to wait
   */
  void preemptionTriggered(long delayBeforeNextStarvationCheck) {
    nextStarvationCheck =
        scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
  }

  /**
   * Whether this app's starvation should be considered.
   */
  boolean shouldCheckForStarvation() {
    return scheduler.getClock().getTime() >= nextStarvationCheck;
  }

  /* Schedulable methods implementation */

  @Override
  public String getName() {
    return getApplicationId().toString();
  }

  @Override
  public Resource getDemand() {
    return demand;
  }

  /**
   * Get the current app's unsatisfied demand.
   */
  Resource getPendingDemand() {
    return Resources.subtract(demand, getResourceUsage());
  }

  @Override
  public long getStartTime() {
    return startTime;
  }

  @Override
  public Resource getMinShare() {
    return Resources.none();
  }

  @Override
  public Resource getMaxShare() {
    return Resources.unbounded();
  }

  @Override
  public Resource getResourceUsage() {
    return getCurrentConsumption();
  }

  @Override
  public float getWeight() {
    float weight = 1.0F;

    if (scheduler.isSizeBasedWeight()) {
      // Set weight based on current memory demand
      weight = (float)(Math.log1p(demand.getMemorySize()) / Math.log(2));
    }

    return weight * appPriority.getPriority();
  }

  @Override
  public Priority getPriority() {
    // Right now per-app priorities are not passed to scheduler,
    // so everyone has the same priority.
    return appPriority;
  }

  @Override
  public Resource getFairShare() {
    return this.fairShare;
  }

  @Override
  public void setFairShare(Resource fairShare) {
    this.fairShare = fairShare;
  }

  @Override
  public void updateDemand() {
    // Demand is current consumption plus outstanding requests
    Resource tmpDemand = Resources.clone(getCurrentConsumption());

    // Add up outstanding resource requests
    for (SchedulerRequestKey k : getSchedulerKeys()) {
      PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY);
      if (pendingAsk.getCount() > 0) {
        Resources.multiplyAndAddTo(tmpDemand,
            pendingAsk.getPerAllocationResource(),
            pendingAsk.getCount());
      }
    }

    // Update demand
    demand = tmpDemand;
  }

  @Override
  public Resource assignContainer(FSSchedulerNode node) {
    if (isOverAMShareLimit()) {
      PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
      updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
          " exceeds maximum AM resource allowed).");
      if (LOG.isDebugEnabled()) {
        LOG.debug("AM resource request: " + amAsk.getPerAllocationResource()
            + " exceeds maximum AM resource allowed, "
            + getQueue().dumpState());
      }
      return Resources.none();
    }
    return assignContainer(node, false);
  }

  /**
   * Build the diagnostic message and update it.
   *
   * @param resource resource request
   * @param reason the reason why AM doesn't get the resource
   */
  private void updateAMDiagnosticMsg(Resource resource, String reason) {
    if (!isWaitingForAMContainer()) {
      return;
    }

    StringBuilder diagnosticMessage = new StringBuilder();
    diagnosticMessage.append(" (Resource request: ")
        .append(resource)
        .append(reason);
    updateAMContainerDiagnostics(AMState.INACTIVATED,
        diagnosticMessage.toString());
  }

  /*
   * Overriding to appease findbugs
   */
  @Override
  public int hashCode() {
    return super.hashCode();
  }

  /*
   * Overriding to appease findbugs
   */
  @Override
  public boolean equals(Object o) {
    return super.equals(o);
  }

  @Override
  public String toString() {
    return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption();
  }

  @Override
  public boolean isPreemptable() {
    return getQueue().isPreemptable();
  }

  @VisibleForTesting
  public void setEnableAMPreemption(boolean enableAMPreemption) {
    this.enableAMPreemption = enableAMPreemption;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocationConfiguration 源码

hadoop AllocationConfigurationException 源码

hadoop AllocationFileLoaderService 源码

hadoop ConfigurableResource 源码

hadoop FSContext 源码

hadoop FSLeafQueue 源码

hadoop FSOpDurations 源码

hadoop FSParentQueue 源码

hadoop FSPreemptionThread 源码

hadoop FSQueue 源码

0  赞