hadoop SingleConstraintAppPlacementAllocator 源码

  • 2022-10-20
  • 浏览 (200)

haddop SingleConstraintAppPlacementAllocator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;

/**
 * This is a simple implementation to do affinity or anti-affinity for
 * inter/intra apps.
 */
public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
    extends AppPlacementAllocator<N> {
  private static final Logger LOG =
      LoggerFactory.getLogger(SingleConstraintAppPlacementAllocator.class);

  private ReentrantReadWriteLock.ReadLock readLock;
  private ReentrantReadWriteLock.WriteLock writeLock;

  private SchedulingRequest schedulingRequest = null;
  private String targetNodePartition;
  private AllocationTagsManager allocationTagsManager;
  private PlacementConstraintManager placementConstraintManager;

  public SingleConstraintAppPlacementAllocator() {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    readLock = lock.readLock();
    writeLock = lock.writeLock();
  }

  @Override
  public PendingAskUpdateResult updatePendingAsk(
      Collection<ResourceRequest> requests,
      boolean recoverPreemptedRequestForAContainer) {
    if (requests != null && !requests.isEmpty()) {
      throw new SchedulerInvalidResoureRequestException(
          this.getClass().getName()
              + " not be able to handle ResourceRequest, there exists a "
              + "SchedulingRequest with the same scheduler key="
              + SchedulerRequestKey.create(requests.iterator().next())
              + ", please send ResourceRequest with a different allocationId and "
              + "priority");
    }

    // Do nothing
    return null;
  }

  private PendingAskUpdateResult internalUpdatePendingAsk(
      SchedulingRequest newSchedulingRequest, boolean recoverContainer) {
    // When it is a recover container, there must exists an schedulingRequest.
    if (recoverContainer && schedulingRequest == null) {
      throw new SchedulerInvalidResoureRequestException("Trying to recover a "
          + "container request=" + newSchedulingRequest.toString() + ", however"
          + "there's no existing scheduling request, this should not happen.");
    }

    if (schedulingRequest != null) {
      // If we have an old scheduling request, we will make sure that no changes
      // made except sizing.
      // To avoid unnecessary copy of the data structure, we do this by
      // replacing numAllocations with old numAllocations in the
      // newSchedulingRequest#getResourceSizing, and compare the two objects.
      ResourceSizing sizing = newSchedulingRequest.getResourceSizing();
      int existingNumAllocations =
          schedulingRequest.getResourceSizing().getNumAllocations();

      // When it is a recovered container request, just set
      // #newAllocations = #existingAllocations + 1;
      int newNumAllocations;
      if (recoverContainer) {
        newNumAllocations = existingNumAllocations + 1;
      } else {
        newNumAllocations = sizing.getNumAllocations();
      }
      sizing.setNumAllocations(existingNumAllocations);

      // Compare two objects
      if (!schedulingRequest.equals(newSchedulingRequest)) {
        // Rollback #numAllocations
        sizing.setNumAllocations(newNumAllocations);
        throw new SchedulerInvalidResoureRequestException(
            "Invalid updated SchedulingRequest added to scheduler, "
                + " we only allows changing numAllocations for the updated "
                + "SchedulingRequest. Old=" + schedulingRequest.toString()
                + " new=" + newSchedulingRequest.toString()
                + ", if any fields need to be updated, please cancel the "
                + "old request (by setting numAllocations to 0) and send a "
                + "SchedulingRequest with different combination of "
                + "priority/allocationId");
      } else {
        if (newNumAllocations == existingNumAllocations) {
          // No update on pending asks, return null.
          return null;
        }
      }

      // Rollback #numAllocations
      sizing.setNumAllocations(newNumAllocations);

      // Basic sanity check
      if (newNumAllocations < 0) {
        throw new SchedulerInvalidResoureRequestException(
            "numAllocation in ResourceSizing field must be >= 0, "
                + "updating schedulingRequest failed.");
      }

      PendingAskUpdateResult updateResult = new PendingAskUpdateResult(
          new PendingAsk(schedulingRequest.getResourceSizing()),
          new PendingAsk(newSchedulingRequest.getResourceSizing()),
          targetNodePartition, targetNodePartition);

      // Ok, now everything is same except numAllocation, update numAllocation.
      this.schedulingRequest.getResourceSizing().setNumAllocations(
          newNumAllocations);
      LOG.info(
          "Update numAllocation from old=" + existingNumAllocations + " to new="
              + newNumAllocations);

      return updateResult;
    }

    // For a new schedulingRequest, we need to validate if we support its asks.
    // This will update internal partitions, etc. after the SchedulingRequest is
    // valid.
    validateAndSetSchedulingRequest(newSchedulingRequest);

    return new PendingAskUpdateResult(null,
        new PendingAsk(newSchedulingRequest.getResourceSizing()), null,
        targetNodePartition);
  }

  @Override
  public PendingAskUpdateResult updatePendingAsk(
      SchedulerRequestKey schedulerRequestKey,
      SchedulingRequest newSchedulingRequest,
      boolean recoverPreemptedRequestForAContainer) {
    writeLock.lock();
    try {
      return internalUpdatePendingAsk(newSchedulingRequest,
          recoverPreemptedRequestForAContainer);
    } finally {
      writeLock.unlock();
    }
  }

  private String throwExceptionWithMetaInfo(String message) {
    StringBuilder sb = new StringBuilder();
    sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append(
        " Key=").append(this.schedulerRequestKey).append(". Exception message:")
        .append(message);
    throw new SchedulerInvalidResoureRequestException(sb.toString());
  }

  private void validateAndSetSchedulingRequest(SchedulingRequest
      newSchedulingRequest)
      throws SchedulerInvalidResoureRequestException {
    // Check sizing exists
    if (newSchedulingRequest.getResourceSizing() == null
        || newSchedulingRequest.getResourceSizing().getResources() == null) {
      throwExceptionWithMetaInfo(
          "No ResourceSizing found in the scheduling request, please double "
              + "check");
    }

    // Check execution type == GUARANTEED
    if (newSchedulingRequest.getExecutionType() != null
        && newSchedulingRequest.getExecutionType().getExecutionType()
        != ExecutionType.GUARANTEED) {
      throwExceptionWithMetaInfo(
          "Only GUARANTEED execution type is supported.");
    }

    this.targetNodePartition = validateAndGetTargetNodePartition(
        newSchedulingRequest.getPlacementConstraint());
    this.schedulingRequest = new SchedulingRequestPBImpl(
        ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());


    LOG.info("Successfully added SchedulingRequest to app="
        + appSchedulingInfo.getApplicationAttemptId()
        + " placementConstraint=["
        + schedulingRequest.getPlacementConstraint()
        + "]. nodePartition=" + targetNodePartition);
  }

  // Tentatively find out potential exist node-partition in the placement
  // constraint and set as the app's primary node-partition.
  // Currently only single constraint is handled.
  private String validateAndGetTargetNodePartition(
      PlacementConstraint placementConstraint) {
    String defaultNodeLabelExpression =
        appSchedulingInfo.getDefaultNodeLabelExpression();
    String nodePartition = defaultNodeLabelExpression == null ?
        RMNodeLabelsManager.NO_LABEL : defaultNodeLabelExpression;
    if (placementConstraint != null &&
        placementConstraint.getConstraintExpr() != null) {
      PlacementConstraint.AbstractConstraint ac =
          placementConstraint.getConstraintExpr();
      if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) {
        PlacementConstraint.SingleConstraint singleConstraint =
            (PlacementConstraint.SingleConstraint) ac;
        for (PlacementConstraint.TargetExpression targetExpression :
            singleConstraint.getTargetExpressions()) {
          // Handle node partition
          if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) &&
              targetExpression.getTargetKey().equals(NODE_PARTITION)) {
            Set<String> values = targetExpression.getTargetValues();
            if (values == null || values.isEmpty()) {
              continue;
            }
            if (values.size() > 1) {
              throwExceptionWithMetaInfo(
                  "Inside one targetExpression, we only support"
                      + " affinity to at most one node partition now");
            }
            nodePartition = values.iterator().next();
            if (nodePartition != null) {
              break;
            }
          }
        }
      }
    }
    return nodePartition;
  }

  @Override
  public Map<String, ResourceRequest> getResourceRequests() {
    return Collections.emptyMap();
  }

  @Override
  public PendingAsk getPendingAsk(String resourceName) {
    readLock.lock();
    try {
      if (resourceName.equals("*") && schedulingRequest != null) {
        return new PendingAsk(schedulingRequest.getResourceSizing());
      }
      return PendingAsk.ZERO;
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public int getOutstandingAsksCount(String resourceName) {
    readLock.lock();
    try {
      if (resourceName.equals("*") && schedulingRequest != null) {
        return schedulingRequest.getResourceSizing().getNumAllocations();
      }
      return 0;
    } finally {
      readLock.unlock();
    }
  }

  private void decreasePendingNumAllocation() {
    // Deduct pending #allocations by 1
    ResourceSizing sizing = schedulingRequest.getResourceSizing();
    sizing.setNumAllocations(sizing.getNumAllocations() - 1);
  }

  @Override
  public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
      NodeType type, SchedulerNode node) {
    writeLock.lock();
    try {
      // Per container scheduling request, it is just a copy of existing
      // scheduling request with #allocations=1
      SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl(
          ((SchedulingRequestPBImpl) schedulingRequest).getProto());
      containerSchedulingRequest.getResourceSizing().setNumAllocations(1);

      // Deduct sizing
      decreasePendingNumAllocation();

      return new ContainerRequest(containerSchedulingRequest);
    } finally {
      writeLock.unlock();
    }
  }

  private boolean checkCardinalityAndPending(SchedulerNode node,
      Optional<DiagnosticsCollector> dcOpt) {
    // Do we still have pending resource?
    if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
      return false;
    }

    // node type will be ignored.
    try {
      return PlacementConstraintsUtil.canSatisfyConstraints(
          appSchedulingInfo.getApplicationId(), schedulingRequest, node,
          placementConstraintManager, allocationTagsManager, dcOpt);
    } catch (InvalidAllocationTagsQueryException e) {
      LOG.warn("Failed to query node cardinality:", e);
      this.incrementPlacementAttempt();
      return false;
    }
  }

  @Override
  public boolean canAllocate(NodeType type, SchedulerNode node) {
    readLock.lock();
    try {
      return checkCardinalityAndPending(node, Optional.empty());
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public boolean canDelayTo(String resourceName) {
    return true;
  }

  @Override
  public boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode) {
    return precheckNode(schedulerNode, schedulingMode, Optional.empty());
  }

  @Override
  public boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode,
      Optional<DiagnosticsCollector> dcOpt) {
    // We will only look at node label = nodeLabelToLookAt according to
    // schedulingMode and partition of node.
    String nodePartitionToLookAt;
    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
      nodePartitionToLookAt = schedulerNode.getPartition();
    } else{
      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
    }

    readLock.lock();
    try {
      // Check node partition as well as cardinality/pending resources.
      boolean rst = this.targetNodePartition.equals(nodePartitionToLookAt);
      if (!rst) {
        if (dcOpt.isPresent()) {
          dcOpt.get().collectPartitionDiagnostics(targetNodePartition,
              nodePartitionToLookAt);
        }
        return rst;
      }
      return checkCardinalityAndPending(schedulerNode, dcOpt);
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public String getPrimaryRequestedNodePartition() {
    return targetNodePartition;
  }

  @Override
  public int getUniqueLocationAsks() {
    return 1;
  }

  @Override
  public void showRequests() {
    readLock.lock();
    try {
      if (schedulingRequest != null) {
        LOG.info(schedulingRequest.toString());
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public SchedulingRequest getSchedulingRequest() {
    return schedulingRequest;
  }

  @VisibleForTesting
  String getTargetNodePartition() {
    return targetNodePartition;
  }

  @Override
  public void initialize(AppSchedulingInfo appSchedulingInfo,
      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
    super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
    this.allocationTagsManager = rmContext.getAllocationTagsManager();
    this.placementConstraintManager = rmContext.getPlacementConstraintManager();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AppPlacementAllocator 源码

hadoop CandidateNodeSet 源码

hadoop CandidateNodeSetUtils 源码

hadoop LocalityAppPlacementAllocator 源码

hadoop MultiNodeLookupPolicy 源码

hadoop MultiNodePolicySpec 源码

hadoop MultiNodeSorter 源码

hadoop MultiNodeSortingManager 源码

hadoop PendingAskUpdateResult 源码

hadoop ResourceUsageMultiNodeLookupPolicy 源码

0  赞