hadoop AbstractCSQueue 源码

  • 2022-10-20
  • 浏览 (191)

haddop AbstractCSQueue 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;

/**
 * Provides implementation of {@code CSQueue} methods common for every queue class in Capacity
 * Scheduler.
 */
public abstract class AbstractCSQueue implements CSQueue {
  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractCSQueue.class);
  protected final QueueAllocationSettings queueAllocationSettings;
  volatile CSQueue parent;
  protected final QueuePath queuePath;
  protected QueueNodeLabelsSettings queueNodeLabelsSettings;
  private volatile QueueAppLifetimeAndLimitSettings queueAppLifetimeSettings;
  private CSQueuePreemptionSettings preemptionSettings;

  private volatile QueueState state = null;
  protected final PrivilegedEntity queueEntity;

  final ResourceCalculator resourceCalculator;
  Set<String> resourceTypes;
  final RMNodeLabelsManager labelManager;
  private String multiNodeSortingPolicyName = null;

  Map<AccessType, AccessControlList> acls =
      new HashMap<AccessType, AccessControlList>();
  volatile boolean reservationsContinueLooking;

  // Track capacities like
  // used-capacity/abs-used-capacity/capacity/abs-capacity,
  // etc.
  QueueCapacities queueCapacities;
  CSQueueUsageTracker usageTracker;

  public enum CapacityConfigType {
    NONE, PERCENTAGE, ABSOLUTE_RESOURCE
  };

  protected CapacityConfigType capacityConfigType =
      CapacityConfigType.NONE;

  protected Map<String, QueueCapacityVector> configuredCapacityVectors;

  private final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(null);
  protected CapacitySchedulerQueueContext queueContext;
  protected YarnAuthorizationProvider authorizer = null;

  protected ActivitiesManager activitiesManager;

  protected ReentrantReadWriteLock.ReadLock readLock;
  protected ReentrantReadWriteLock.WriteLock writeLock;

  volatile Priority priority = Priority.newInstance(0);
  private UserWeights userWeights = UserWeights.createEmpty();

  // is it a dynamic queue?
  private boolean dynamicQueue = false;

  public AbstractCSQueue(CapacitySchedulerQueueContext queueContext, String queueName,
      CSQueue parent, CSQueue old) {
    this.parent = parent;
    this.queuePath = createQueuePath(parent, queueName);

    this.queueContext = queueContext;
    this.resourceCalculator = queueContext.getResourceCalculator();
    this.activitiesManager = queueContext.getActivitiesManager();
    this.labelManager = queueContext.getLabelManager();

    // must be called after parent and queueName is set
    CSQueueMetrics metrics = old != null ?
        (CSQueueMetrics) old.getMetrics() :
        CSQueueMetrics.forQueue(getQueuePath(), parent,
            queueContext.getConfiguration().getEnableUserMetrics(),
            queueContext.getConfiguration());
    this.usageTracker = new CSQueueUsageTracker(metrics);

    this.queueCapacities = new QueueCapacities(parent == null);
    this.queueAllocationSettings = new QueueAllocationSettings(queueContext.getMinimumAllocation());

    this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());

    this.resourceTypes = new HashSet<>();
    for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
      this.resourceTypes.add(type.toString().toLowerCase());
    }

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();

    LOG.debug("Initialized {}: name={}, fullname={}", this.getClass().getSimpleName(),
        queueName, getQueuePath());
  }

  private static QueuePath createQueuePath(CSQueue parent, String queueName) {
    if (parent == null) {
      return new QueuePath(null, queueName);
    }
    return new QueuePath(parent.getQueuePath(), queueName);
  }

  /**
   * Sets up capacity and weight values from configuration.
   */
  protected void setupConfigurableCapacities() {
    CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
        queueContext.getConfiguration(), this.queueNodeLabelsSettings.getConfiguredNodeLabels());
  }

  @Override
  public String getQueuePath() {
    return queuePath.getFullPath();
  }

  @Override
  public QueuePath getQueuePathObject() {
    return this.queuePath;
  }

  @Override
  public float getCapacity() {
    return queueCapacities.getCapacity();
  }

  @Override
  public float getAbsoluteCapacity() {
    return queueCapacities.getAbsoluteCapacity();
  }

  @Override
  public float getAbsoluteMaximumCapacity() {
    return queueCapacities.getAbsoluteMaximumCapacity();
  }

  @Override
  public float getAbsoluteUsedCapacity() {
    return queueCapacities.getAbsoluteUsedCapacity();
  }

  @Override
  public float getMaximumCapacity() {
    return queueCapacities.getMaximumCapacity();
  }

  @Override
  public float getUsedCapacity() {
    return queueCapacities.getUsedCapacity();
  }

  @Override
  public Resource getUsedResources() {
    return usageTracker.getQueueUsage().getUsed();
  }

  public int getNumContainers() {
    return usageTracker.getNumContainers();
  }

  @Override
  public QueueState getState() {
    return state;
  }

  @Override
  public CSQueueMetrics getMetrics() {
    return usageTracker.getMetrics();
  }

  @Override
  public String getQueueShortName() {
    return queuePath.getLeafName();
  }

  @Override
  public String getQueueName() {
    return this.queuePath.getLeafName();
  }

  @Override
  public CSQueue getParent() {
    return parent;
  }

  @Override
  public void setParent(CSQueue newParentQueue) {
    this.parent = newParentQueue;
    getMetrics().setParentQueue(newParentQueue);
  }

  @Override
  public PrivilegedEntity getPrivilegedEntity() {
    return queueEntity;
  }

  public CapacitySchedulerQueueContext getQueueContext() {
    return queueContext;
  }

  public Set<String> getAccessibleNodeLabels() {
    return queueNodeLabelsSettings.getAccessibleNodeLabels();
  }

  /**
   * Checks whether the user has the required permission to execute the action of {@code QueueACL}.
   * @param acl the access type the user is checked for
   * @param user UGI of the user
   * @return true, if the user has permission, false otherwise
   */
  @Override
  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
    return authorizer.checkPermission(
        new AccessRequest(queueEntity, user, SchedulerUtils.toAccessType(acl),
            null, null, Server.getRemoteAddress(), null));
  }

  /**
   * Set maximum capacity for empty node label.
   * @param maximumCapacity new max capacity
   */
  @VisibleForTesting
  void setMaxCapacity(float maximumCapacity) {
    internalSetMaximumCapacity(maximumCapacity, NO_LABEL);
  }

  /**
   * Set maximum capacity.
   * @param maximumCapacity new max capacity
   */
  void setMaxCapacity(String nodeLabel, float maximumCapacity) {
    internalSetMaximumCapacity(maximumCapacity, nodeLabel);
  }

  private void internalSetMaximumCapacity(float maximumCapacity, String nodeLabel) {
    writeLock.lock();
    try {
      // Sanity check
      CSQueueUtils.checkMaxCapacity(this.queuePath,
          queueCapacities.getCapacity(nodeLabel), maximumCapacity);
      float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
          maximumCapacity, parent);
      CSQueueUtils.checkAbsoluteCapacity(this.queuePath,
          queueCapacities.getAbsoluteCapacity(nodeLabel), absMaxCapacity);

      queueCapacities.setMaximumCapacity(maximumCapacity);
      queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public String getDefaultNodeLabelExpression() {
    return this.queueNodeLabelsSettings.getDefaultLabelExpression();
  }

  /**
   * Initialize queue properties that are based on configuration.
   * @param clusterResource overall resource of the cluster
   * @throws IOException if configuration is set in a way that is inconsistent
   */
  protected void setupQueueConfigs(Resource clusterResource) throws
      IOException {

    writeLock.lock();
    try {
      CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
      this.acls = configuration.getAcls(getQueuePath());

      if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
        setDynamicQueueProperties();
        setDynamicQueueACLProperties();
      }

      // Collect and set the Node label configuration
      this.queueNodeLabelsSettings = new QueueNodeLabelsSettings(configuration, parent,
          queuePath, queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());

      // Initialize the queue capacities
      setupConfigurableCapacities();
      updateAbsoluteCapacities();
      updateCapacityConfigType();

      // Fetch minimum/maximum resource limits for this queue if
      // configured
      updateConfigurableResourceLimits(clusterResource);

      // Setup queue's maximumAllocation respecting the global
      // and the queue settings
      this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
          parent);

      // Initialize the queue state based on previous state, configured state
      // and its parent state
      QueueStateHelper.setQueueState(this);

      authorizer = YarnAuthorizationProvider.getInstance(configuration);

      this.userWeights = getUserWeightsFromHierarchy();

      this.reservationsContinueLooking =
          configuration.getReservationContinueLook();

      this.configuredCapacityVectors = configuration
          .parseConfiguredResourceVector(queuePath.getFullPath(),
              this.queueNodeLabelsSettings.getConfiguredNodeLabels());

      // Update metrics
      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
          this, labelManager, null);

      // Store preemption settings
      this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration);
      this.priority = configuration.getQueuePriority(
          getQueuePath());

      // Update multi-node sorting algorithm for scheduling as configured.
      setMultiNodeSortingPolicyName(
          configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));

      // Setup application related limits
      this.queueAppLifetimeSettings = new QueueAppLifetimeAndLimitSettings(configuration,
          this, queuePath);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Set properties specific to dynamic queues.
   */
  protected void setDynamicQueueProperties() {
    // Set properties from parent template
    if (parent instanceof ParentQueue) {
      ((ParentQueue) parent).getAutoCreatedQueueTemplate()
          .setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath());

      String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
          AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
      parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
          DOT));
      Set<String> parentNodeLabels = queueContext.getQueueManager()
          .getConfiguredNodeLabelsForAllQueues()
          .getLabelsByQueue(parentTemplate);

      if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
        queueContext.getQueueManager()
            .getConfiguredNodeLabelsForAllQueues()
            .setLabelsByQueue(getQueuePath(), new HashSet<>(parentNodeLabels));
      }
    }
  }

  protected void setDynamicQueueACLProperties() {
  }

  private UserWeights getUserWeightsFromHierarchy() {
    UserWeights unionInheritedWeights = UserWeights.createEmpty();
    CSQueue parentQ = parent;
    if (parentQ != null) {
      // Inherit all of parent's userWeights
      unionInheritedWeights.addFrom(parentQ.getUserWeights());
    }
    // Insert this queue's userWeights, overriding parent's userWeights if
    // there is an overlap.
    unionInheritedWeights.addFrom(
        queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePath()));
    return unionInheritedWeights;
  }

  protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
    return queueContext.getConfiguration()
        .getMinimumResourceRequirement(label, queuePath, resourceTypes);
  }

  protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
    return queueContext.getConfiguration()
        .getMaximumResourceRequirement(label, queuePath, resourceTypes);
  }

  protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
      String label) {
    return queueContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
        queuePath, resourceTypes);
  }

  protected void updateCapacityConfigType() {
    this.capacityConfigType = CapacityConfigType.NONE;
    for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
      LOG.debug("capacityConfigType is '{}' for queue {}",
          capacityConfigType, getQueuePath());

      CapacityConfigType localType = checkConfigTypeIsAbsoluteResource(
          getQueuePath(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
          : CapacityConfigType.PERCENTAGE;

      if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
        this.capacityConfigType = localType;
        LOG.debug("capacityConfigType is updated as '{}' for queue {}",
            capacityConfigType, getQueuePath());
      } else {
        validateAbsoluteVsPercentageCapacityConfig(localType);
      }
    }
  }

  /**
   * Initializes configured minimum and maximum capacity from configuration, if capacity is defined
   * in ABSOLUTE node.
   * @param clusterResource overall resource of the cluster
   */
  protected void updateConfigurableResourceLimits(Resource clusterResource) {
    for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
      final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
      Resource maxResource = getMaximumAbsoluteResource(getQueuePath(), label);

      if (parent != null) {
        final Resource parentMax = parent.getQueueResourceQuotas()
            .getConfiguredMaxResource(label);
        validateMinResourceIsNotGreaterThanMaxResource(maxResource, parentMax, clusterResource,
            "Max resource configuration "
                + maxResource + " is greater than parents max value:"
                + parentMax + " in queue:" + getQueuePath());

        // If child's max resource is not set, but its parent max resource is
        // set, we must set child max resource to its parent's.
        if (maxResource.equals(Resources.none()) &&
            !minResource.equals(Resources.none()) &&
            !parentMax.equals(Resources.none())) {
          maxResource = Resources.clone(parentMax);
        }
      }

      validateMinResourceIsNotGreaterThanMaxResource(minResource, maxResource, clusterResource,
          "Min resource configuration "
              + minResource + " is greater than its max value:" + maxResource
              + " in queue:" + getQueuePath());

      LOG.debug("Updating absolute resource configuration for queue:{} as"
              + " minResource={} and maxResource={}", getQueuePath(), minResource,
          maxResource);

      usageTracker.getQueueResourceQuotas().setConfiguredMinResource(label, minResource);
      usageTracker.getQueueResourceQuotas().setConfiguredMaxResource(label, maxResource);
    }
  }

  private void validateMinResourceIsNotGreaterThanMaxResource(Resource minResource,
                                                              Resource maxResource,
                                                              Resource clusterResource,
                                                              String validationError) {
    if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
        resourceCalculator, clusterResource, minResource, maxResource)) {
      throw new IllegalArgumentException(validationError);
    }
  }

  private void validateAbsoluteVsPercentageCapacityConfig(
      CapacityConfigType localType) {
    if (!queuePath.isRoot()
        && !this.capacityConfigType.equals(localType)) {
      throw new IllegalArgumentException("Queue '" + getQueuePath()
          + "' should use either percentage based capacity"
          + " configuration or absolute resource.");
    }
  }

  @Override
  public CapacityConfigType getCapacityConfigType() {
    return capacityConfigType;
  }

  @Override
  public Resource getEffectiveCapacity(String label) {
    return Resources
        .clone(getQueueResourceQuotas().getEffectiveMinResource(label));
  }

  @Override
  public Resource getEffectiveCapacityDown(String label, Resource factor) {
    return Resources.normalizeDown(resourceCalculator,
        getQueueResourceQuotas().getEffectiveMinResource(label),
        queueAllocationSettings.getMinimumAllocation());
  }

  @Override
  public Resource getEffectiveMaxCapacity(String label) {
    return Resources
        .clone(getQueueResourceQuotas().getEffectiveMaxResource(label));
  }

  @Override
  public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
    return Resources.normalizeDown(resourceCalculator,
        getQueueResourceQuotas().getEffectiveMaxResource(label),
        queueAllocationSettings.getMinimumAllocation());
  }

  @Override
  public QueueCapacityVector getConfiguredCapacityVector(
      String label) {
    return configuredCapacityVectors.get(label);
  }

  protected QueueInfo getQueueInfo() {
    // Deliberately doesn't use lock here, because this method will be invoked
    // from schedulerApplicationAttempt, to avoid deadlock, sacrifice
    // consistency here.
    // TODO, improve this
    QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
    queueInfo.setQueueName(queuePath.getLeafName());
    queueInfo.setQueuePath(queuePath.getFullPath());
    queueInfo.setAccessibleNodeLabels(queueNodeLabelsSettings.getAccessibleNodeLabels());
    queueInfo.setCapacity(queueCapacities.getCapacity());
    queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
    queueInfo.setQueueState(getState());
    queueInfo.setDefaultNodeLabelExpression(queueNodeLabelsSettings.getDefaultLabelExpression());
    queueInfo.setCurrentCapacity(getUsedCapacity());
    queueInfo.setQueueStatistics(getQueueStatistics());
    queueInfo.setPreemptionDisabled(preemptionSettings.isPreemptionDisabled());
    queueInfo.setIntraQueuePreemptionDisabled(
        getIntraQueuePreemptionDisabled());
    queueInfo.setQueueConfigurations(getQueueConfigurations());
    queueInfo.setWeight(queueCapacities.getWeight());
    queueInfo.setMaxParallelApps(queueAppLifetimeSettings.getMaxParallelApps());
    return queueInfo;
  }

  public QueueStatistics getQueueStatistics() {
    // Deliberately doesn't use lock here, because this method will be invoked
    // from schedulerApplicationAttempt, to avoid deadlock, sacrifice
    // consistency here.
    // TODO, improve this
    QueueStatistics stats = recordFactory.newRecordInstance(
        QueueStatistics.class);
    stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
    stats.setNumAppsRunning(getMetrics().getAppsRunning());
    stats.setNumAppsPending(getMetrics().getAppsPending());
    stats.setNumAppsCompleted(getMetrics().getAppsCompleted());
    stats.setNumAppsKilled(getMetrics().getAppsKilled());
    stats.setNumAppsFailed(getMetrics().getAppsFailed());
    stats.setNumActiveUsers(getMetrics().getActiveUsers());
    stats.setAvailableMemoryMB(getMetrics().getAvailableMB());
    stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB());
    stats.setPendingMemoryMB(getMetrics().getPendingMB());
    stats.setReservedMemoryMB(getMetrics().getReservedMB());
    stats.setAvailableVCores(getMetrics().getAvailableVirtualCores());
    stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores());
    stats.setPendingVCores(getMetrics().getPendingVirtualCores());
    stats.setReservedVCores(getMetrics().getReservedVirtualCores());
    stats.setPendingContainers(getMetrics().getPendingContainers());
    stats.setAllocatedContainers(getMetrics().getAllocatedContainers());
    stats.setReservedContainers(getMetrics().getReservedContainers());
    return stats;
  }

  public Map<String, QueueConfigurations> getQueueConfigurations() {
    Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
    Set<String> nodeLabels = getNodeLabelsForQueue();
    QueueResourceQuotas queueResourceQuotas = usageTracker.getQueueResourceQuotas();
    for (String nodeLabel : nodeLabels) {
      QueueConfigurations queueConfiguration =
          recordFactory.newRecordInstance(QueueConfigurations.class);
      float capacity = queueCapacities.getCapacity(nodeLabel);
      float absoluteCapacity = queueCapacities.getAbsoluteCapacity(nodeLabel);
      float maxCapacity = queueCapacities.getMaximumCapacity(nodeLabel);
      float absMaxCapacity =
          queueCapacities.getAbsoluteMaximumCapacity(nodeLabel);
      float maxAMPercentage =
          queueCapacities.getMaxAMResourcePercentage(nodeLabel);
      queueConfiguration.setCapacity(capacity);
      queueConfiguration.setAbsoluteCapacity(absoluteCapacity);
      queueConfiguration.setMaxCapacity(maxCapacity);
      queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity);
      queueConfiguration.setMaxAMPercentage(maxAMPercentage);
      queueConfiguration.setConfiguredMinCapacity(
          queueResourceQuotas.getConfiguredMinResource(nodeLabel));
      queueConfiguration.setConfiguredMaxCapacity(
          queueResourceQuotas.getConfiguredMaxResource(nodeLabel));
      queueConfiguration.setEffectiveMinCapacity(
          queueResourceQuotas.getEffectiveMinResource(nodeLabel));
      queueConfiguration.setEffectiveMaxCapacity(
          queueResourceQuotas.getEffectiveMaxResource(nodeLabel));
      queueConfigurations.put(nodeLabel, queueConfiguration);
    }
    return queueConfigurations;
  }

  @Private
  public Resource getMaximumAllocation() {
    return queueAllocationSettings.getMaximumAllocation();
  }

  @Private
  public Resource getMinimumAllocation() {
    return queueAllocationSettings.getMinimumAllocation();
  }

  /**
   * Increments resource usage of the queue and all related statistics and metrics that depends on
   * it.
   * @param clusterResource overall cluster resource
   * @param resource resource amount to increment
   * @param nodePartition node label
   */
  void allocateResource(Resource clusterResource,
      Resource resource, String nodePartition) {
    writeLock.lock();
    try {
      usageTracker.getQueueUsage().incUsed(nodePartition, resource);
      usageTracker.increaseNumContainers();
      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
          this, labelManager, nodePartition);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Decrements resource usage of the queue and all related statistics and metrics that depends on
   * it.
   * @param clusterResource overall cluster resource
   * @param resource resource amount to decrement
   * @param nodePartition node label
   */
  protected void releaseResource(Resource clusterResource,
      Resource resource, String nodePartition) {
    writeLock.lock();
    try {
      usageTracker.getQueueUsage().decUsed(nodePartition, resource);

      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
          this, labelManager, nodePartition);

      usageTracker.decreaseNumContainers();
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Returns whether we should continue to look at all heart beating nodes even
   * after the reservation limit was hit.
   */
  @Private
  public boolean isReservationsContinueLooking() {
    return reservationsContinueLooking;
  }

  @Private
  public Map<AccessType, AccessControlList> getACLs() {
    readLock.lock();
    try {
      return acls;
    } finally {
      readLock.unlock();
    }
  }

  @Private
  public boolean getPreemptionDisabled() {
    return preemptionSettings.isPreemptionDisabled();
  }

  @Private
  public boolean getIntraQueuePreemptionDisabled() {
    return preemptionSettings.isIntraQueuePreemptionDisabled();
  }

  @Private
  public boolean getIntraQueuePreemptionDisabledInHierarchy() {
    return preemptionSettings.isIntraQueuePreemptionDisabledInHierarchy();
  }

  @Private
  public QueueCapacities getQueueCapacities() {
    return queueCapacities;
  }

  @Private
  public ResourceUsage getQueueResourceUsage() {
    return usageTracker.getQueueUsage();
  }

  @Override
  public QueueResourceQuotas getQueueResourceQuotas() {
    return usageTracker.getQueueResourceQuotas();
  }

  @Override
  public ReentrantReadWriteLock.ReadLock getReadLock() {
    return readLock;
  }

  private Resource getCurrentLimitResource(String nodePartition,
      Resource clusterResource, ResourceLimits currentResourceLimits,
      SchedulingMode schedulingMode) {
    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
      /*
       * Current limit resource: For labeled resource: limit = queue-max-resource
       * (TODO, this part need update when we support labeled-limit) For
       * non-labeled resource: limit = min(queue-max-resource,
       * limit-set-by-parent)
       */
      Resource queueMaxResource =
          getQueueMaxResource(nodePartition);

      return Resources.min(resourceCalculator, clusterResource,
          queueMaxResource, currentResourceLimits.getLimit());
    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
      // When we doing non-exclusive resource allocation, maximum capacity of
      // all queues on this label equals to total resource with the label.
      return labelManager.getResourceByLabel(nodePartition, clusterResource);
    }

    return Resources.none();
  }

  Resource getQueueMaxResource(String nodePartition) {
    return getEffectiveMaxCapacity(nodePartition);
  }

  @VisibleForTesting
  boolean hasChildQueues() {
    List<CSQueue> childQueues = getChildQueues();
    return childQueues != null && !childQueues.isEmpty();
  }

  /**
   * Checks whether this queue has remaining resources left for further container assigment.
   * @param clusterResource overall cluster resource
   * @param nodePartition node label
   * @param currentResourceLimits limit of the queue imposed by its maximum capacity
   * @param resourceCouldBeUnreserved reserved resource that could potentially be unreserved
   * @param schedulingMode scheduling strategy to handle node labels
   * @return true if queue has remaining free resource, false otherwise
   */
  boolean canAssignToThisQueue(Resource clusterResource,
      String nodePartition, ResourceLimits currentResourceLimits,
      Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
    readLock.lock();
    try {
      // Get current limited resource:
      // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
      // queues' max capacity.
      // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
      // queue's max capacity, queue's max capacity on the partition will be
      // considered to be 100%. Which is a queue can use all resource in the
      // partition.
      // Doing this because: for non-exclusive allocation, we make sure there's
      // idle resource on the partition, to avoid wastage, such resource will be
      // leveraged as much as we can, and preemption policy will reclaim it back
      // when partitioned-resource-request comes back.
      Resource currentLimitResource = getCurrentLimitResource(nodePartition,
          clusterResource, currentResourceLimits, schedulingMode);

      Resource nowTotalUsed = usageTracker.getQueueUsage().getUsed(nodePartition);

      // Set headroom for currentResourceLimits:
      // When queue is a parent queue: Headroom = limit - used + killable
      // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
      Resource usedExceptKillable = nowTotalUsed;
      if (hasChildQueues()) {
        usedExceptKillable = Resources.subtract(nowTotalUsed,
            getTotalKillableResource(nodePartition));
      }
      currentResourceLimits.setHeadroom(
          Resources.subtract(currentLimitResource, usedExceptKillable));

      if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          usedExceptKillable, currentLimitResource)) {

        // if reservation continue looking enabled, check to see if could we
        // potentially use this node instead of a reserved node if the application
        // has reserved containers.
        if (this.reservationsContinueLooking
            && Resources.greaterThan(resourceCalculator, clusterResource,
            resourceCouldBeUnreserved, Resources.none())) {
          // resource-without-reserved = used - reserved
          Resource newTotalWithoutReservedResource = Resources.subtract(
              usedExceptKillable, resourceCouldBeUnreserved);

          // when total-used-without-reserved-resource < currentLimit, we still
          // have chance to allocate on this node by unreserving some containers
          if (Resources.lessThan(resourceCalculator, clusterResource,
              newTotalWithoutReservedResource, currentLimitResource)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("try to use reserved: " + getQueuePath()
                  + " usedResources: " + usageTracker.getQueueUsage().getUsed()
                  + ", clusterResources: " + clusterResource
                  + ", reservedResources: " + resourceCouldBeUnreserved
                  + ", capacity-without-reserved: "
                  + newTotalWithoutReservedResource
                  + ", maxLimitCapacity: " + currentLimitResource);
            }
            return true;
          }
        }

        // Can not assign to this queue
        if (LOG.isDebugEnabled()) {
          LOG.debug("Failed to assign to queue: " + getQueuePath()
              + " nodePartition: " + nodePartition
              + ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition)
              + ", clusterResources: " + clusterResource
              + ", reservedResources: " + resourceCouldBeUnreserved
              + ", maxLimitCapacity: " + currentLimitResource
              + ", currTotalUsed:" + usedExceptKillable);
        }
        return false;
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Check assign to queue: " + getQueuePath()
            + " nodePartition: " + nodePartition
            + ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition)
            + ", clusterResources: " + clusterResource
            + ", currentUsedCapacity: " + Resources
            .divide(resourceCalculator, clusterResource,
                usageTracker.getQueueUsage().getUsed(nodePartition), labelManager
                    .getResourceByLabel(nodePartition, clusterResource))
            + ", max-capacity: " + queueCapacities
            .getAbsoluteMaximumCapacity(nodePartition));
      }
      return true;
    } finally {
      readLock.unlock();
    }

  }

  private static String ensurePartition(String partition) {
    return Optional.ofNullable(partition).orElse(NO_LABEL);
  }

  @FunctionalInterface
  interface Counter {
    void count(String partition, Resource resource);
  }

  @FunctionalInterface
  interface CounterWithApp {
    void count(String partition, Resource reservedRes, SchedulerApplicationAttempt application);
  }

  private void count(String partition, Resource resource, Counter counter, Counter parentCounter) {
    final String checkedPartition = ensurePartition(partition);
    counter.count(checkedPartition, resource);
    Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource));
  }

  private void countAndUpdate(String partition, Resource resource,
                              Counter counter, CounterWithApp parentCounter) {
    final String checkedPartition = ensurePartition(partition);
    counter.count(checkedPartition, resource);
    CSQueueUtils.updateUsedCapacity(resourceCalculator,
        labelManager.getResourceByLabel(checkedPartition, Resources.none()),
        checkedPartition, this);
    Optional.ofNullable(parentCounter).ifPresent(c -> c.count(checkedPartition, resource, null));
  }

  @Override
  public void incReservedResource(String partition, Resource reservedRes) {
    count(partition, reservedRes, usageTracker.getQueueUsage()::incReserved,
        parent == null ? null : parent::incReservedResource);
  }

  @Override
  public void decReservedResource(String partition, Resource reservedRes) {
    count(partition, reservedRes, usageTracker.getQueueUsage()::decReserved,
        parent == null ? null : parent::decReservedResource);
  }

  @Override
  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
    count(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incPending,
        parent == null ? null : parent::incPendingResource);
  }

  @Override
  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
    count(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decPending,
        parent == null ? null : parent::decPendingResource);
  }

  @Override
  public void incUsedResource(String nodeLabel, Resource resourceToInc,
      SchedulerApplicationAttempt application) {
    countAndUpdate(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incUsed,
        parent == null ? null : parent::incUsedResource);
  }

  @Override
  public void decUsedResource(String nodeLabel, Resource resourceToDec,
      SchedulerApplicationAttempt application) {
    countAndUpdate(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decUsed,
        parent == null ? null : parent::decUsedResource);
  }

  /**
   * Return if the queue has pending resource on given nodePartition and
   * schedulingMode.
   */
  boolean hasPendingResourceRequest(String nodePartition,
      Resource cluster, SchedulingMode schedulingMode) {
    return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
        usageTracker.getQueueUsage(), nodePartition, cluster, schedulingMode);
  }

  @Override
  public Priority getDefaultApplicationPriority() {
    return null;
  }

  /**
   * Returns the union of all node labels that could be accessed by this queue based on accessible
   * node labels and configured node labels properties.
   * @return node labels this queue has access to
   */
  @Override
  public Set<String> getNodeLabelsForQueue() {
    // if queue's label is *, queue can access any labels. Instead of
    // considering all labels in cluster, only those labels which are
    // use some resource of this queue can be considered.
    Set<String> nodeLabels = new HashSet<String>();
    if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels()
        .contains(RMNodeLabelsManager.ANY)) {
      nodeLabels.addAll(Sets.union(this.getQueueCapacities().getExistingNodeLabels(),
          this.getQueueResourceUsage().getExistingNodeLabels()));
    } else {
      nodeLabels.addAll(this.getAccessibleNodeLabels());
    }

    // Add NO_LABEL also to this list as NO_LABEL also can be granted with
    // resource in many general cases.
    if (!nodeLabels.contains(NO_LABEL)) {
      nodeLabels.add(NO_LABEL);
    }
    return nodeLabels;
  }

  public Resource getTotalKillableResource(String partition) {
    return queueContext.getPreemptionManager().getKillableResource(getQueuePath(),
        partition);
  }

  public Iterator<RMContainer> getKillableContainers(String partition) {
    return queueContext.getPreemptionManager().getKillableContainers(
        getQueuePath(),
        partition);
  }

  @VisibleForTesting
  @Override
  public CSAssignment assignContainers(Resource clusterResource,
      FiCaSchedulerNode node, ResourceLimits resourceLimits,
      SchedulingMode schedulingMode) {
    return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node),
        resourceLimits, schedulingMode);
  }

  /**
   * Checks whether this queue could accept the container allocation request.
   * @param cluster overall cluster resource
   * @param request container allocation request
   * @return true if queue could accept the container allocation request, false otherwise
   */
  @Override
  public boolean accept(Resource cluster,
      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
    // Do we need to check parent queue before making this decision?
    boolean checkParentQueue = false;

    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
        request.getFirstAllocatedOrReservedContainer();
    SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
        allocation.getAllocatedOrReservedContainer();

    // Do not check when allocating new container from a reserved container
    if (allocation.getAllocateFromReservedContainer() == null) {
      Resource required = allocation.getAllocatedOrReservedResource();
      Resource netAllocated = Resources.subtract(required,
          request.getTotalReleasedResource());

      readLock.lock();
      try {
        String partition = schedulerContainer.getNodePartition();
        Resource maxResourceLimit;
        if (allocation.getSchedulingMode()
            == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
          maxResourceLimit = getQueueMaxResource(partition);
        } else{
          maxResourceLimit = labelManager.getResourceByLabel(
              schedulerContainer.getNodePartition(), cluster);
        }
        if (!Resources.fitsIn(resourceCalculator,
            Resources.add(usageTracker.getQueueUsage().getUsed(partition), netAllocated),
            maxResourceLimit)) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Used resource=" + usageTracker.getQueueUsage().getUsed(partition)
                + " exceeded maxResourceLimit of the queue ="
                + maxResourceLimit);
          }
          return false;
        }
      } finally {
        readLock.unlock();
      }

      // Only check parent queue when something new allocated or reserved.
      checkParentQueue = true;
    }


    if (parent != null && checkParentQueue) {
      return parent.accept(cluster, request);
    }

    return true;
  }

  @Override
  public void validateSubmitApplication(ApplicationId applicationId,
      String userName, String queue) throws AccessControlException {
    // Dummy implementation
  }

  @Override
  public void updateQueueState(QueueState queueState) {
    this.state = queueState;
  }

  /**
   * Sets the state of this queue to RUNNING.
   * @throws YarnException if its parent queue is not in RUNNING state
   */
  @Override
  public void activateQueue() throws YarnException {
    this.writeLock.lock();
    try {
      if (getState() == QueueState.RUNNING) {
        LOG.info("The specified queue:" + getQueuePath()
            + " is already in the RUNNING state.");
      } else {
        CSQueue parentQueue = parent;
        if (parentQueue == null || parentQueue.getState() == QueueState.RUNNING) {
          updateQueueState(QueueState.RUNNING);
        } else {
          throw new YarnException("The parent Queue:" + parentQueue.getQueuePath()
              + " is not running. Please activate the parent queue first");
        }
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  /**
   * Stops this queue if no application is currently running on the queue.
   */
  protected void appFinished() {
    this.writeLock.lock();
    try {
      if (getState() == QueueState.DRAINING) {
        if (getNumApplications() == 0) {
          updateQueueState(QueueState.STOPPED);
        }
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public Priority getPriority() {
    return this.priority;
  }

  @Override
  public UserWeights getUserWeights() {
    return userWeights;
  }

  /**
   * Recursively sets the state of this queue and the state of its parent to DRAINING.
   */
  public void recoverDrainingState() {
    this.writeLock.lock();
    try {
      if (getState() == QueueState.STOPPED) {
        updateQueueState(QueueState.DRAINING);
      }
      LOG.info("Recover draining state for queue " + this.getQueuePath());
      if (parent != null && parent.getState() == QueueState.STOPPED) {
        ((AbstractCSQueue) parent).recoverDrainingState();
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String getMultiNodeSortingPolicyName() {
    return this.multiNodeSortingPolicyName;
  }

  public void setMultiNodeSortingPolicyName(String policyName) {
    this.multiNodeSortingPolicyName = policyName;
  }

  public long getMaximumApplicationLifetime() {
    return queueAppLifetimeSettings.getMaxApplicationLifetime();
  }

  public long getDefaultApplicationLifetime() {
    return queueAppLifetimeSettings.getDefaultApplicationLifetime();
  }

  public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
    return queueAppLifetimeSettings.isDefaultAppLifetimeWasSpecifiedInConfig();
  }

  public void setMaxParallelApps(int maxParallelApps) {
    this.queueAppLifetimeSettings.setMaxParallelApps(maxParallelApps);
  }

  @Override
  public int getMaxParallelApps() {
    return this.queueAppLifetimeSettings.getMaxParallelApps();
  }

  abstract int getNumRunnableApps();

  protected void updateAbsoluteCapacities() {
    QueueCapacities parentQueueCapacities = null;
    if (parent != null) {
      parentQueueCapacities = parent.getQueueCapacities();
    }

    CSQueueUtils.updateAbsoluteCapacitiesByNodeLabels(queueCapacities,
        parentQueueCapacities, queueCapacities.getExistingNodeLabels());
  }

  private Resource createNormalizedMinResource(Resource minResource,
      Map<String, Float> effectiveMinRatio) {
    Resource ret = Resource.newInstance(minResource);
    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
    for (int i = 0; i < maxLength; i++) {
      ResourceInformation nResourceInformation =
          minResource.getResourceInformation(i);

      Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
      if (ratio != null) {
        ret.setResourceValue(i,
            (long) (nResourceInformation.getValue() * ratio));
        if (LOG.isDebugEnabled()) {
          LOG.debug("Updating min resource for Queue: " + getQueuePath() + " as " + ret
              .getResourceInformation(i) + ", Actual resource: "
              + nResourceInformation.getValue() + ", ratio: " + ratio);
        }
      }
    }
    return ret;
  }

  private Resource getOrInheritMaxResource(Resource resourceByLabel, String label) {
    Resource parentMaxResource =
        parent.getQueueResourceQuotas().getConfiguredMaxResource(label);
    if (parentMaxResource.equals(Resources.none())) {
      parentMaxResource =
          parent.getQueueResourceQuotas().getEffectiveMaxResource(label);
    }

    Resource configuredMaxResource =
        getQueueResourceQuotas().getConfiguredMaxResource(label);
    if (configuredMaxResource.equals(Resources.none())) {
      return Resources.clone(parentMaxResource);
    }

    return Resources.clone(Resources.min(resourceCalculator, resourceByLabel,
        configuredMaxResource, parentMaxResource));
  }

  void deriveCapacityFromAbsoluteConfigurations(String label,
      Resource clusterResource) {
    // Update capacity with a float calculated from the parent's minResources
    // and the recently changed queue minResources.
    // capacity = effectiveMinResource / {parent's effectiveMinResource}
    float result = resourceCalculator.divide(clusterResource,
        usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label),
        parent.getQueueResourceQuotas().getEffectiveMinResource(label));
    queueCapacities.setCapacity(label,
        Float.isInfinite(result) ? 0 : result);

    // Update maxCapacity with a float calculated from the parent's maxResources
    // and the recently changed queue maxResources.
    // maxCapacity = effectiveMaxResource / parent's effectiveMaxResource
    result = resourceCalculator.divide(clusterResource,
        usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(label),
        parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
    queueCapacities.setMaximumCapacity(label,
        Float.isInfinite(result) ? 0 : result);

    // Update absolute capacity (as in fraction of the
    // whole cluster's resources) with a float calculated from the queue's
    // capacity and the parent's absoluteCapacity.
    // absoluteCapacity = capacity * parent's absoluteCapacity
    queueCapacities.setAbsoluteCapacity(label,
        queueCapacities.getCapacity(label) * parent.getQueueCapacities()
            .getAbsoluteCapacity(label));

    // Update absolute maxCapacity (as in fraction of the
    // whole cluster's resources) with a float calculated from the queue's
    // maxCapacity and the parent's absoluteMaxCapacity.
    // absoluteMaxCapacity = maxCapacity * parent's absoluteMaxCapacity
    queueCapacities.setAbsoluteMaximumCapacity(label,
        queueCapacities.getMaximumCapacity(label) *
            parent.getQueueCapacities()
                .getAbsoluteMaximumCapacity(label));
  }

  void updateEffectiveResources(Resource clusterResource) {
    for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
      Resource resourceByLabel = labelManager.getResourceByLabel(label,
          clusterResource);
      Resource newEffectiveMinResource;
      Resource newEffectiveMaxResource;

      // Absolute and relative/weight mode needs different handling.
      if (getCapacityConfigType().equals(
          CapacityConfigType.ABSOLUTE_RESOURCE)) {
        newEffectiveMinResource = createNormalizedMinResource(
            usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label),
            ((ParentQueue) parent).getEffectiveMinRatio(label));

        // Max resource of a queue should be the minimum of {parent's maxResources,
        // this queue's maxResources}. Both parent's maxResources and this queue's
        // maxResources can be configured. If this queue's maxResources is not
        // configured, inherit the value from the parent. If parent's
        // maxResources is not configured its inherited value must be collected.
        newEffectiveMaxResource =
            getOrInheritMaxResource(resourceByLabel, label);
      } else {
        newEffectiveMinResource = Resources
            .multiply(resourceByLabel,
                queueCapacities.getAbsoluteCapacity(label));
        newEffectiveMaxResource = Resources
            .multiply(resourceByLabel,
                queueCapacities.getAbsoluteMaximumCapacity(label));
      }

      // Update the effective min
      usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label,
          newEffectiveMinResource);
      usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label,
          newEffectiveMaxResource);

      if (LOG.isDebugEnabled()) {
        LOG.debug("Updating queue:" + getQueuePath()
            + " with effective minimum resource=" + newEffectiveMinResource
            + "and effective maximum resource="
            + newEffectiveMaxResource);
      }

      if (getCapacityConfigType().equals(
          CapacityConfigType.ABSOLUTE_RESOURCE)) {
        /*
         * If the queues are configured with absolute resources, it is advised
         * to update capacity/max-capacity/etc. based on the newly calculated
         * resource values. These values won't be used for actual resource
         * distribution, however, for accurate metrics and the UI
         * they should be re-calculated.
         */
        deriveCapacityFromAbsoluteConfigurations(label, clusterResource);
      }
    }
  }

  public boolean isDynamicQueue() {
    readLock.lock();

    try {
      return dynamicQueue;
    } finally {
      readLock.unlock();
    }
  }

  public void setDynamicQueue(boolean dynamicQueue) {
    writeLock.lock();

    try {
      this.dynamicQueue = dynamicQueue;
    } finally {
      writeLock.unlock();
    }
  }

  protected String getCapacityOrWeightString() {
    if (queueCapacities.getWeight() != -1) {
      return "weight=" + queueCapacities.getWeight() + ", " +
          "normalizedWeight=" + queueCapacities.getNormalizedWeight();
    } else {
      return "capacity=" + queueCapacities.getCapacity();
    }
  }

  /**
   * Checks whether this queue is a dynamic queue and could be deleted.
   * @return true if the dynamic queue could be deleted, false otherwise
   */
  public boolean isEligibleForAutoDeletion() {
    return false;
  }

  /**
   * Checks whether this queue is a dynamic queue and there has not been an application submission
   * on it for a configured period of time.
   * @return true if queue has been idle for a configured period of time, false otherwise
   */
  public boolean isInactiveDynamicQueue() {
    long idleDurationSeconds =
        (Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
    return isDynamicQueue() && isEligibleForAutoDeletion() &&
        (idleDurationSeconds > queueContext.getConfiguration().
            getAutoExpiredDeletionTime());
  }

  void updateLastSubmittedTimeStamp() {
    writeLock.lock();
    try {
      usageTracker.setLastSubmittedTimestamp(Time.monotonicNow());
    } finally {
      writeLock.unlock();
    }
  }

  @VisibleForTesting
  long getLastSubmittedTimestamp() {
    readLock.lock();

    try {
      return usageTracker.getLastSubmittedTimestamp();
    } finally {
      readLock.unlock();
    }
  }

  @VisibleForTesting
  void setLastSubmittedTimestamp(long lastSubmittedTimestamp) {
    writeLock.lock();
    try {
      usageTracker.setLastSubmittedTimestamp(lastSubmittedTimestamp);
    } finally {
      writeLock.unlock();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractAutoCreatedLeafQueue 源码

hadoop AbstractLeafQueue 源码

hadoop AbstractManagedParentQueue 源码

hadoop AppPriorityACLConfigurationParser 源码

hadoop AppPriorityACLGroup 源码

hadoop AutoCreatedLeafQueue 源码

hadoop AutoCreatedLeafQueueConfig 源码

hadoop AutoCreatedQueueDeletionPolicy 源码

hadoop AutoCreatedQueueManagementPolicy 源码

hadoop AutoCreatedQueueTemplate 源码

0  赞