hadoop AppPlacementAllocator 源码

  • 2022-10-20
  • 浏览 (251)

haddop AppPlacementAllocator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;

import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * <p>
 * This class has the following functionality:
 * 1) Keeps track of pending resource requests when following events happen:
 * - New ResourceRequests are added to scheduler.
 * - New containers get allocated.
 *
 * 2) Determines the order that the nodes given in the {@link CandidateNodeSet}
 * will be used for allocating containers.
 * </p>
 *
 * <p>
 * And different set of resource requests (E.g., resource requests with the
 * same schedulerKey) can have one instance of AppPlacementAllocator, each
 * AppPlacementAllocator can have different ways to order nodes depends on
 * requests.
 * </p>
 */
public abstract class AppPlacementAllocator<N extends SchedulerNode> {
  protected AppSchedulingInfo appSchedulingInfo;
  protected SchedulerRequestKey schedulerRequestKey;
  protected RMContext rmContext;
  private AtomicInteger placementAttempt = new AtomicInteger(0);
  private MultiNodeSortingManager<N> multiNodeSortingManager = null;
  private String multiNodeSortPolicyName;

  private static final Logger LOG =
      LoggerFactory.getLogger(AppPlacementAllocator.class);

  /**
   * Get iterator of preferred node depends on requirement and/or availability.
   * @param candidateNodeSet input CandidateNodeSet
   * @return iterator of preferred node
   */
  public Iterator<N> getPreferredNodeIterator(
      CandidateNodeSet<N> candidateNodeSet) {
    // Now only handle the case that single node in the candidateNodeSet
    // TODO, Add support to multi-hosts inside candidateNodeSet which is passed
    // in.

    N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
    if (singleNode != null) {
      return IteratorUtils.singletonIterator(singleNode);
    }

    // singleNode will be null if Multi-node placement lookup is enabled, and
    // hence could consider sorting policies.
    return multiNodeSortingManager.getMultiNodeSortIterator(
        candidateNodeSet.getAllNodes().values(),
        candidateNodeSet.getPartition(),
        multiNodeSortPolicyName);
  }

  /**
   * Replace existing pending asks by the new requests
   *
   * @param requests new asks
   * @param recoverPreemptedRequestForAContainer if we're recovering resource
   * requests for preempted container
   * @return true if total pending resource changed
   */
  public abstract PendingAskUpdateResult updatePendingAsk(
      Collection<ResourceRequest> requests,
      boolean recoverPreemptedRequestForAContainer);

  /**
   * Replace existing pending asks by the new SchedulingRequest
   *
   * @param schedulerRequestKey                  scheduler request key
   * @param schedulingRequest                    new asks
   * @param recoverPreemptedRequestForAContainer if we're recovering resource
   *                                             requests for preempted container
   * @return true if total pending resource changed
   */
  public abstract PendingAskUpdateResult updatePendingAsk(
      SchedulerRequestKey schedulerRequestKey,
      SchedulingRequest schedulingRequest,
      boolean recoverPreemptedRequestForAContainer);

  /**
   * Get pending ResourceRequests by given schedulerRequestKey
   * @return Map of resourceName to ResourceRequest
   */
  public abstract Map<String, ResourceRequest> getResourceRequests();

  /**
   * Get pending ask for given resourceName. If there's no such pendingAsk,
   * returns {@link PendingAsk#ZERO}
   *
   * @param resourceName resourceName
   * @return PendingAsk
   */
  public abstract PendingAsk getPendingAsk(String resourceName);

  /**
   * Get #pending-allocations for given resourceName. If there's no such
   * pendingAsk, returns 0
   *
   * @param resourceName resourceName
   * @return #pending-allocations
   */
  public abstract int getOutstandingAsksCount(String resourceName);

  /**
   * Notify container allocated.
   * @param schedulerKey SchedulerRequestKey for this ResourceRequest
   * @param type Type of the allocation
   * @param node Which node this container allocated on
   * @return ContainerRequest which include resource requests associated with
   *         the container. This will be used by scheduler to recover requests.
   *         Please refer to {@link ContainerRequest} for more details.
   */
  public abstract ContainerRequest allocate(SchedulerRequestKey schedulerKey,
      NodeType type, SchedulerNode node);

  /**
   * We can still have pending requirement for a given NodeType and node
   * @param type Locality Type
   * @param node which node we will allocate on
   * @return true if we has pending requirement
   */
  public abstract boolean canAllocate(NodeType type, SchedulerNode node);

  /**
   * Can delay to give locality?
   * TODO: This should be moved out of AppPlacementAllocator
   * and should belong to specific delay scheduling policy impl.
   * See YARN-7457 for more details.
   *
   * @param resourceName resourceName
   * @return can/cannot
   */
  public abstract boolean canDelayTo(String resourceName);

  /**
   * Does this {@link AppPlacementAllocator} accept resources on given node?
   *
   * @param schedulerNode schedulerNode
   * @param schedulingMode schedulingMode
   * @param dcOpt optional diagnostics collector
   * @return accepted/not
   */
  public abstract boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode,
      Optional<DiagnosticsCollector> dcOpt);

  public abstract boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode);

  /**
   * It is possible that one request can accept multiple node partition,
   * So this method returns primary node partition for pending resource /
   * headroom calculation.
   *
   * @return primary requested node partition
   */
  public abstract String getPrimaryRequestedNodePartition();

  /**
   * @return number of unique location asks with #pending greater than 0,
   * (like /rack1, host1, etc.).
   *
   * TODO: This should be moved out of AppPlacementAllocator
   * and should belong to specific delay scheduling policy impl.
   * See YARN-7457 for more details.
   */
  public abstract int getUniqueLocationAsks();

  /**
   * Print human-readable requests to LOG debug.
   */
  public abstract void showRequests();

  /**
   * Initialize this allocator, this will be called by Factory automatically.
   *
   * @param appSchedulingInfo appSchedulingInfo
   * @param schedulerRequestKey schedulerRequestKey
   * @param rmContext rmContext
   */
  public void initialize(AppSchedulingInfo appSchedulingInfo,
      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
    this.appSchedulingInfo = appSchedulingInfo;
    this.rmContext = rmContext;
    this.schedulerRequestKey = schedulerRequestKey;
    multiNodeSortPolicyName = appSchedulingInfo
        .getApplicationSchedulingEnvs().get(
        ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
    multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
        .getMultiNodeSortingManager();
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "nodeLookupPolicy used for " + appSchedulingInfo.getApplicationId()
          + " is " + ((multiNodeSortPolicyName != null)
          ? multiNodeSortPolicyName : ""));
    }
  }

  /**
   * Get pending SchedulingRequest.
   * @return SchedulingRequest
   */
  public abstract SchedulingRequest getSchedulingRequest();

  public int getPlacementAttempt() {
    return placementAttempt.get();
  }

  public void incrementPlacementAttempt() {
    placementAttempt.getAndIncrement();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CandidateNodeSet 源码

hadoop CandidateNodeSetUtils 源码

hadoop LocalityAppPlacementAllocator 源码

hadoop MultiNodeLookupPolicy 源码

hadoop MultiNodePolicySpec 源码

hadoop MultiNodeSorter 源码

hadoop MultiNodeSortingManager 源码

hadoop PendingAskUpdateResult 源码

hadoop ResourceUsageMultiNodeLookupPolicy 源码

hadoop SimpleCandidateNodeSet 源码

0  赞