hadoop FSQueue 源码

  • 2022-10-20
  • 浏览 (218)

haddop FSQueue 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.apache.hadoop.classification.VisibleForTesting;

@Private
@Unstable
public abstract class FSQueue implements Queue, Schedulable {
  private static final Logger LOG = LoggerFactory.getLogger(
      FSQueue.class.getName());

  private Resource fairShare = Resources.createResource(0, 0);
  private Resource steadyFairShare = Resources.createResource(0, 0);
  private Resource reservedResource = Resources.createResource(0, 0);
  private final Resource resourceUsage = Resource.newInstance(0, 0);
  private final String name;
  protected final FairScheduler scheduler;
  private final YarnAuthorizationProvider authorizer;
  private final PrivilegedEntity queueEntity;
  private final FSQueueMetrics metrics;
  
  protected final FSParentQueue parent;
  protected final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(null);
  
  protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;

  protected float weights;
  protected Resource minShare;
  private ConfigurableResource maxShare;
  protected int maxRunningApps;
  private ConfigurableResource maxChildQueueResource;

  // maxAMShare is a value between 0 and 1.
  protected float maxAMShare;

  private long fairSharePreemptionTimeout = Long.MAX_VALUE;
  private long minSharePreemptionTimeout = Long.MAX_VALUE;
  private float fairSharePreemptionThreshold = 0.5f;
  private boolean preemptable = true;
  private boolean isDynamic = true;
  protected Resource maxContainerAllocation;

  public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
    this.name = name;
    this.scheduler = scheduler;
    this.authorizer =
        YarnAuthorizationProvider.getInstance(scheduler.getConf());
    this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
    this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
    this.parent = parent;
    setPolicy(scheduler.getAllocationConfiguration().getSchedulingPolicy(name));
    reinit(false);
  }

  /**
   * Initialize a queue by setting its queue-specific properties and its
   * metrics. This method is invoked when creating a new queue or reloading
   * the allocation file.
   * This method does not set policies for queues when reloading the allocation
   * file since we need to either set all new policies or nothing, which is
   * handled by method {@link #verifyAndSetPolicyFromConf}.
   *
   * @param recursive whether child queues should be reinitialized recursively
   */
  public final void reinit(boolean recursive) {
    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
    allocConf.initFSQueue(this);
    updatePreemptionVariables();

    if (recursive) {
      for (FSQueue child : getChildQueues()) {
        child.reinit(recursive);
      }
    }
  }

  public String getName() {
    return name;
  }

  @Override
  public String getQueueName() {
    return name;
  }

  public SchedulingPolicy getPolicy() {
    return policy;
  }

  public FSParentQueue getParent() {
    return parent;
  }

  public void setPolicy(SchedulingPolicy policy) {
    policy.initialize(scheduler.getContext());
    this.policy = policy;
  }

  public void setWeights(float weights) {
    this.weights = weights;
  }

  @Override
  public float getWeight() {
    return weights;
  }

  public void setMinShare(Resource minShare){
    this.minShare = minShare;
  }

  @Override
  public Resource getMinShare() {
    return minShare;
  }

  public void setMaxShare(ConfigurableResource maxShare){
    this.maxShare = maxShare;
  }

  public void setMaxContainerAllocation(Resource maxContainerAllocation){
    this.maxContainerAllocation = maxContainerAllocation;
  }

  public abstract Resource getMaximumContainerAllocation();

  @Override
  public Resource getMaxShare() {
    Resource maxResource = maxShare.getResource(scheduler.getClusterResource());

    // Max resource should be greater than or equal to min resource
    Resource result = Resources.componentwiseMax(maxResource, minShare);

    if (!Resources.equals(maxResource, result)) {
      LOG.warn(String.format("Queue %s has max resources %s less than "
          + "min resources %s", getName(), maxResource, minShare));
    }
    return result;
  }

  public ConfigurableResource getRawMaxShare() {
    return maxShare;
  }

  public Resource getReservedResource() {
    reservedResource.setMemorySize(metrics.getReservedMB());
    reservedResource.setVirtualCores(metrics.getReservedVirtualCores());
    return reservedResource;
  }

  public void setMaxChildQueueResource(ConfigurableResource maxChildShare){
    this.maxChildQueueResource = maxChildShare;
  }

  public ConfigurableResource getMaxChildQueueResource() {
    return maxChildQueueResource;
  }

  public void setMaxRunningApps(int maxRunningApps){
    this.maxRunningApps = maxRunningApps;
  }

  public int getMaxRunningApps() {
    return maxRunningApps;
  }

  @VisibleForTesting
  public float getMaxAMShare() {
    return maxAMShare;
  }

  public void setMaxAMShare(float maxAMShare){
    this.maxAMShare = maxAMShare;
  }

  @Override
  public long getStartTime() {
    return 0;
  }

  @Override
  public Priority getPriority() {
    Priority p = recordFactory.newRecordInstance(Priority.class);
    p.setPriority(1);
    return p;
  }
  
  @Override
  public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
    QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
    queueInfo.setQueueName(getQueueName());

    if (scheduler.getClusterResource().getMemorySize() == 0) {
      queueInfo.setCapacity(0.0f);
    } else {
      queueInfo.setCapacity((float) getFairShare().getMemorySize() /
          scheduler.getClusterResource().getMemorySize());
    }

    if (getFairShare().getMemorySize() == 0) {
      queueInfo.setCurrentCapacity(0.0f);
    } else {
      queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
          getFairShare().getMemorySize());
    }

    ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
    if (includeChildQueues) {
      Collection<FSQueue> childQueues = getChildQueues();
      for (FSQueue child : childQueues) {
        childQueueInfos.add(child.getQueueInfo(recursive, recursive));
      }
    }
    queueInfo.setChildQueues(childQueueInfos);
    queueInfo.setQueueState(QueueState.RUNNING);
    queueInfo.setQueueStatistics(getQueueStatistics());
    return queueInfo;
  }

  public QueueStatistics getQueueStatistics() {
    QueueStatistics stats =
        recordFactory.newRecordInstance(QueueStatistics.class);
    stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
    stats.setNumAppsRunning(getMetrics().getAppsRunning());
    stats.setNumAppsPending(getMetrics().getAppsPending());
    stats.setNumAppsCompleted(getMetrics().getAppsCompleted());
    stats.setNumAppsKilled(getMetrics().getAppsKilled());
    stats.setNumAppsFailed(getMetrics().getAppsFailed());
    stats.setNumActiveUsers(getMetrics().getActiveUsers());
    stats.setAvailableMemoryMB(getMetrics().getAvailableMB());
    stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB());
    stats.setPendingMemoryMB(getMetrics().getPendingMB());
    stats.setReservedMemoryMB(getMetrics().getReservedMB());
    stats.setAvailableVCores(getMetrics().getAvailableVirtualCores());
    stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores());
    stats.setPendingVCores(getMetrics().getPendingVirtualCores());
    stats.setReservedVCores(getMetrics().getReservedVirtualCores());
    stats.setAllocatedContainers(getMetrics().getAllocatedContainers());
    stats.setPendingContainers(getMetrics().getPendingContainers());
    stats.setReservedContainers(getMetrics().getReservedContainers());
    return stats;
  }
  
  @Override
  public FSQueueMetrics getMetrics() {
    return metrics;
  }

  /** Get the fair share assigned to this Schedulable. */
  public Resource getFairShare() {
    return fairShare;
  }

  @Override
  public void setFairShare(Resource fairShare) {
    this.fairShare = fairShare;
    metrics.setFairShare(fairShare);
    LOG.debug("The updated fairShare for {} is {}", getName(), fairShare);
  }

  /** Get the steady fair share assigned to this Schedulable. */
  public Resource getSteadyFairShare() {
    return steadyFairShare;
  }

  void setSteadyFairShare(Resource steadyFairShare) {
    this.steadyFairShare = steadyFairShare;
    metrics.setSteadyFairShare(steadyFairShare);
  }

  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
    return authorizer.checkPermission(
        new AccessRequest(queueEntity, user,
            SchedulerUtils.toAccessType(acl), null, null,
            Server.getRemoteAddress(), null));
  }

  long getFairSharePreemptionTimeout() {
    return fairSharePreemptionTimeout;
  }

  void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
    this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
  }

  long getMinSharePreemptionTimeout() {
    return minSharePreemptionTimeout;
  }

  void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
    this.minSharePreemptionTimeout = minSharePreemptionTimeout;
  }

  float getFairSharePreemptionThreshold() {
    return fairSharePreemptionThreshold;
  }

  void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
    this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
  }

  @Override
  public boolean isPreemptable() {
    return preemptable;
  }

  /**
   * Recomputes the shares for all child queues and applications based on this
   * queue's current share.
   *
   * To be called holding the scheduler writelock.
   */
  abstract void updateInternal();

  /**
   * Set the queue's fairshare and update the demand/fairshare of child
   * queues/applications.
   *
   * To be called holding the scheduler writelock.
   *
   * @param fairShare
   */
  public void update(Resource fairShare) {
    setFairShare(fairShare);
    updateInternal();
  }

  /**
   * Update the min/fair share preemption timeouts, threshold and preemption
   * disabled flag for this queue.
   */
  private void updatePreemptionVariables() {
    // For min share timeout
    minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
        .getMinSharePreemptionTimeout(getName());
    if (minSharePreemptionTimeout == -1 && parent != null) {
      minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
    }
    // For fair share timeout
    fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
        .getFairSharePreemptionTimeout(getName());
    if (fairSharePreemptionTimeout == -1 && parent != null) {
      fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
    }
    // For fair share preemption threshold
    fairSharePreemptionThreshold = scheduler.getAllocationConfiguration()
        .getFairSharePreemptionThreshold(getName());
    if (fairSharePreemptionThreshold < 0 && parent != null) {
      fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
    }
    // For option whether allow preemption from this queue.
    // If the parent is non-preemptable, this queue is non-preemptable as well,
    // otherwise get the value from the allocation file.
    if (parent != null && !parent.isPreemptable()) {
      preemptable = false;
    } else {
      preemptable = scheduler.getAllocationConfiguration()
          .isPreemptable(getName());
    }
  }

  /**
   * Gets the children of this queue, if any.
   */
  public abstract List<FSQueue> getChildQueues();
  
  /**
   * Adds all applications in the queue and its subqueues to the given collection.
   * @param apps the collection to add the applications to
   */
  public abstract void collectSchedulerApplications(
      Collection<ApplicationAttemptId> apps);
  
  /**
   * Return the number of apps for which containers can be allocated.
   * Includes apps in subqueues.
   */
  public abstract int getNumRunnableApps();
  
  /**
   * Helper method to check if the queue should attempt assigning resources
   * 
   * @return true if check passes (can assign) or false otherwise
   */
  boolean assignContainerPreCheck(FSSchedulerNode node) {
    if (node.getReservedContainer() != null) {
      LOG.debug("Assigning container failed on node '{}' because it has"
          + " reserved containers.", node.getNodeName());
      return false;
    } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Assigning container failed on node '" + node.getNodeName()
            + " because queue resource usage is larger than MaxShare: "
            + dumpState());
      }
      return false;
    } else {
      return true;
    }
  }

  /**
   * Returns true if queue has at least one app running.
   */
  public boolean isActive() {
    return getNumRunnableApps() > 0;
  }

  /** Convenient toString implementation for debugging. */
  @Override
  public String toString() {
    return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
  }
  
  @Override
  public Set<String> getAccessibleNodeLabels() {
    // TODO, add implementation for FS
    return null;
  }
  
  @Override
  public String getDefaultNodeLabelExpression() {
    // TODO, add implementation for FS
    return null;
  }
  
  @Override
  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
  }
  
  @Override
  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
  }

  @Override
  public void incReservedResource(String nodeLabel, Resource resourceToInc) {
  }

  @Override
  public void decReservedResource(String nodeLabel, Resource resourceToDec) {
  }

  @Override
  public Resource getResourceUsage() {
    return resourceUsage;
  }

  /**
   * Increase resource usage for this queue and all parent queues.
   *
   * @param res the resource to increase
   */
  protected void incUsedResource(Resource res) {
    synchronized (resourceUsage) {
      Resources.addTo(resourceUsage, res);
      if (parent != null) {
        parent.incUsedResource(res);
      }
    }
  }

  /**
   * Decrease resource usage for this queue and all parent queues.
   *
   * @param res the resource to decrease
   */
  protected void decUsedResource(Resource res) {
    synchronized (resourceUsage) {
      Resources.subtractFrom(resourceUsage, res);
      if (parent != null) {
        parent.decUsedResource(res);
      }
    }
  }

  @Override
  public Priority getDefaultApplicationPriority() {
    // TODO add implementation for FSParentQueue
    return null;
  }

  boolean fitsInMaxShare(Resource additionalResource) {
    Resource usagePlusAddition =
        Resources.add(getResourceUsage(), additionalResource);

    if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Resource usage plus resource request: " + usagePlusAddition
            + " exceeds maximum resource allowed:" + getMaxShare()
            + " in queue " + getName());
      }
      return false;
    }

    FSQueue parentQueue = getParent();
    if (parentQueue != null) {
      return parentQueue.fitsInMaxShare(additionalResource);
    }
    return true;
  }

  /**
   * Recursively check policies for queues in pre-order. Get queue policies
   * from the allocation file instead of properties of {@link FSQueue} objects.
   * Set the policy for current queue if there is no policy violation for its
   * children. This method is invoked while reloading the allocation file.
   *
   * @param queueConf allocation configuration
   * @return true if no policy violation and successfully set polices
   *         for queues; false otherwise
   */
  public boolean verifyAndSetPolicyFromConf(AllocationConfiguration queueConf) {
    SchedulingPolicy queuePolicy = queueConf.getSchedulingPolicy(getName());

    for (FSQueue child : getChildQueues()) {
      if (!queuePolicy.isChildPolicyAllowed(
          queueConf.getSchedulingPolicy(child.getName()))) {
        return false;
      }
      boolean success = child.verifyAndSetPolicyFromConf(queueConf);
      if (!success) {
        return false;
      }
    }

    // Set the policy if no policy violation for all children
    setPolicy(queuePolicy);
    return true;
  }

  /**
   * Recursively dump states of all queues.
   *
   * @return a string which holds all queue states
   */
  public String dumpState() {
    StringBuilder sb = new StringBuilder();
    dumpStateInternal(sb);
    return sb.toString();
  }


  /**
   * Recursively dump states of all queues.
   *
   * @param sb the {code StringBuilder} which holds queue states
   */
  protected abstract void dumpStateInternal(StringBuilder sb);

  public boolean isDynamic() {
    return isDynamic;
  }

  public void setDynamic(boolean dynamic) {
    this.isDynamic = dynamic;
  }

  public abstract boolean isEmpty();
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocationConfiguration 源码

hadoop AllocationConfigurationException 源码

hadoop AllocationFileLoaderService 源码

hadoop ConfigurableResource 源码

hadoop FSAppAttempt 源码

hadoop FSContext 源码

hadoop FSLeafQueue 源码

hadoop FSOpDurations 源码

hadoop FSParentQueue 源码

hadoop FSPreemptionThread 源码

0  赞