hadoop IntraQueueCandidatesSelector 源码

  • 2022-10-20
  • 浏览 (211)

haddop IntraQueueCandidatesSelector 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Identifies over utilized resources within a queue and tries to normalize
 * them to resolve resource allocation anomalies w.r.t priority and user-limit.
 */
public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {

  @SuppressWarnings("serial")
  static class TAPriorityComparator
      implements
        Serializable,
        Comparator<TempAppPerPartition> {

    @Override
    public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
      Priority p1 = Priority.newInstance(ta1.getPriority());
      Priority p2 = Priority.newInstance(ta2.getPriority());

      if (!p1.equals(p2)) {
        return p1.compareTo(p2);
      }
      return ta1.getApplicationId().compareTo(ta2.getApplicationId());
    }
  }

  /*
   * Order first by amount used from least to most. Then order from oldest to
   * youngest if amount used is the same.
   */
  static class TAFairOrderingComparator
      implements Comparator<TempAppPerPartition> {

    private ResourceCalculator rc;
    private Resource clusterRes;

    TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) {
      this.rc = rc;
      this.clusterRes = clusterRes;
    }

    @Override
    public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
      if (ta1.getUser().equals(ta2.getUser())) {
        AbstractComparatorOrderingPolicy<FiCaSchedulerApp> acop =
            (AbstractComparatorOrderingPolicy<FiCaSchedulerApp>)
            ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy();
        return acop.getComparator()
                  .compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp());
      } else {
        Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM();
        Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM();
        if (Resources.equals(usedByUser1, usedByUser2)) {
          return ta1.getApplicationId().compareTo(ta2.getApplicationId());
        }
        if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) {
          return -1;
        } else {
          return 1;
        }
      }
    }
  }

  IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
  final CapacitySchedulerPreemptionContext context;

  private static final Logger LOG =
      LoggerFactory.getLogger(IntraQueueCandidatesSelector.class);

  IntraQueueCandidatesSelector(
      CapacitySchedulerPreemptionContext preemptionContext) {
    super(preemptionContext);
    fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
        preemptionContext);
    context = preemptionContext;
  }

  @Override
  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
      Resource clusterResource, Resource totalPreemptedResourceAllowed) {
    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
    // 1. Calculate the abnormality within each queue one by one.
    computeIntraQueuePreemptionDemand(
        clusterResource, totalPreemptedResourceAllowed, selectedCandidates);

    // 2. Previous selectors (with higher priority) could have already
    // selected containers. We need to deduct pre-emptable resources
    // based on already selected candidates.
    CapacitySchedulerPreemptionUtils
        .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
            selectedCandidates);

    // 3. Loop through all partitions to select containers for preemption.
    for (String partition : preemptionContext.getAllPartitions()) {
      LinkedHashSet<String> queueNames = preemptionContext
          .getUnderServedQueuesPerPartition(partition);

      // Error check to handle non-mapped labels to queue.
      if (null == queueNames) {
        continue;
      }

      // 4. Iterate from most under-served queue in order.
      for (String queueName : queueNames) {
        AbstractLeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
            RMNodeLabelsManager.NO_LABEL).leafQueue;

        // skip if not a leafqueue
        if (null == leafQueue) {
          continue;
        }

        // Don't preempt if intra-queue preemption is disabled for this queue.
        if (leafQueue.getIntraQueuePreemptionDisabled()) {
          continue;
        }

        // 5. Calculate the resource to obtain per partition
        Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
            .getResourceDemandFromAppsPerQueue(queueName, partition);

        // Default preemption iterator considers only FIFO+priority. For
        // userlimit preemption, its possible that some lower priority apps
        // needs from high priority app of another user. Hence use apps
        // ordered by userlimit starvation as well.
        Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
            .getPreemptableApps(queueName, partition);

        // 6. Get user-limit to ensure that we do not preempt resources which
        // will force user's resource to come under its UL.
        Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
        initializeUsageAndUserLimitForCompute(clusterResource, partition,
            leafQueue, rollingResourceUsagePerUser);

        // 7. Based on the selected resource demand per partition, select
        // containers with known policy from inter-queue preemption.
        leafQueue.getReadLock().lock();
        try {
          for (FiCaSchedulerApp app : apps) {
            preemptFromLeastStarvedApp(app, selectedCandidates,
                curCandidates, clusterResource, totalPreemptedResourceAllowed,
                resToObtainByPartition, rollingResourceUsagePerUser);
          }
        } finally {
          leafQueue.getReadLock().unlock();
        }
      }
    }

    return curCandidates;
  }

  private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
      String partition, AbstractLeafQueue leafQueue,
      Map<String, Resource> rollingResourceUsagePerUser) {
    for (String user : leafQueue.getAllUsers()) {
      // Initialize used resource of a given user for rolling computation.
      rollingResourceUsagePerUser.put(user, Resources.clone(
          leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
      LOG.debug("Rolling resource usage for user:{} is : {}", user,
          rollingResourceUsagePerUser.get(user));
    }
  }

  private void preemptFromLeastStarvedApp(FiCaSchedulerApp app,
      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
      Resource clusterResource, Resource totalPreemptedResourceAllowed,
      Map<String, Resource> resToObtainByPartition,
      Map<String, Resource> rollingResourceUsagePerUser) {

    // ToDo: Reuse reservation selector here.

    List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
    sortContainers(liveContainers);
    LOG.debug("totalPreemptedResourceAllowed for preemption at this"
        + " round is :{}", totalPreemptedResourceAllowed);

    Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
        .get(app.getUser());
    for (RMContainer c : liveContainers) {

      // if there are no demand, return.
      if (resToObtainByPartition.isEmpty()) {
        return;
      }

      // skip preselected containers.
      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
          selectedCandidates)) {
        continue;
      }

      // Skip already marked to killable containers
      if (null != preemptionContext.getKillableContainers() && preemptionContext
          .getKillableContainers().contains(c.getContainerId())) {
        continue;
      }

      // Skip AM Container from preemption for now.
      if (c.isAMContainer()) {
        continue;
      }

      // If selected container brings down resource usage under its user's
      // UserLimit (or equals to), we must skip such containers.
      if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
          clusterResource, rollingUsedResourcePerUser, c)) {
        LOG.debug("Skipping container: {} with resource:{} as UserLimit for"
            + " user:{} with resource usage: {} is going under UL",
            c.getContainerId(), c.getAllocatedResource(), app.getUser(),
            rollingUsedResourcePerUser);

        break;
      }

      // Try to preempt this container
      boolean ret = CapacitySchedulerPreemptionUtils
          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
              resToObtainByPartition, c, clusterResource, selectedCandidates,
              curCandidates, totalPreemptedResourceAllowed,
              preemptionContext.getInQueuePreemptionConservativeDRF());

      // Subtract from respective user's resource usage once a container is
      // selected for preemption.
      if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
          .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
        Resources.subtractFrom(rollingUsedResourcePerUser,
            c.getAllocatedResource());
      }
    }
  }

  private void computeIntraQueuePreemptionDemand(Resource clusterResource,
      Resource totalPreemptedResourceAllowed,
      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {

    // 1. Iterate through all partition to calculate demand within a partition.
    for (String partition : context.getAllPartitions()) {
      LinkedHashSet<String> queueNames = context
          .getUnderServedQueuesPerPartition(partition);

      if (null == queueNames) {
        continue;
      }

      // 2. loop through all queues corresponding to a partition.
      for (String queueName : queueNames) {
        TempQueuePerPartition tq = context.getQueueByPartition(queueName,
            partition);
        AbstractLeafQueue leafQueue = tq.leafQueue;

        // skip if its parent queue
        if (null == leafQueue) {
          continue;
        }

        // 3. Consider reassignableResource as (used - actuallyToBePreempted).
        // This provides as upper limit to split apps quota in a queue.
        Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
            tq.getActuallyToBePreempted());

        // 4. Check queue's used capacity. Make sure that the used capacity is
        // above certain limit to consider for intra queue preemption.
        if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
            .getMinimumThresholdForIntraQueuePreemption()) {
          continue;
        }

        // 5. compute the allocation of all apps based on queue's unallocated
        // capacity
        fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
            tq, selectedCandidates, totalPreemptedResourceAllowed,
            queueReassignableResource,
            context.getMaxAllowableLimitForIntraQueuePreemption());
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractPreemptableResourceCalculator 源码

hadoop AbstractPreemptionEntity 源码

hadoop CapacitySchedulerPreemptionContext 源码

hadoop CapacitySchedulerPreemptionUtils 源码

hadoop FifoCandidatesSelector 源码

hadoop FifoIntraQueuePreemptionPlugin 源码

hadoop IntraQueuePreemptionComputePlugin 源码

hadoop PreemptableResourceCalculator 源码

hadoop PreemptionCandidatesSelector 源码

hadoop ProportionalCapacityPreemptionPolicy 源码

0  赞