hadoop FifoOrderingPolicyWithExclusivePartitions 源码

  • 2022-10-20
  • 浏览 (215)

haddop FifoOrderingPolicyWithExclusivePartitions 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;


/**
 * Similar to {@link FifoOrderingPolicy}, but with separate ordering policies
 * for each partition in
 * {@code yarn.scheduler.capacity.<queue-path>.ordering-policy.exclusive-enforced-partitions}.
 */
public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEntity>
    implements OrderingPolicy<S> {

  private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION";

  private Map<String, OrderingPolicy<S>> orderingPolicies;

  public FifoOrderingPolicyWithExclusivePartitions() {
    this.orderingPolicies = new HashMap<>();
    this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy());
  }

  public Collection<S> getSchedulableEntities() {
    return unionOrderingPolicies().getSchedulableEntities();
  }

  public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
    // Return schedulable entities only from filtered partition
    return getPartitionOrderingPolicy(sel.getPartition())
        .getAssignmentIterator(sel);
  }

  public Iterator<S> getPreemptionIterator() {
    // Entities from all partitions should be preemptible
    return unionOrderingPolicies().getPreemptionIterator();
  }

  /**
   * Union all schedulable entities from all ordering policies.
   * @return ordering policy containing all schedulable entities
   */
  private OrderingPolicy<S> unionOrderingPolicies() {
    OrderingPolicy<S> ret = new FifoOrderingPolicy<>();
    for (Map.Entry<String, OrderingPolicy<S>> entry
        : orderingPolicies.entrySet()) {
      ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities());
    }
    return ret;
  }

  public void addSchedulableEntity(S s) {
    getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s);
  }

  public boolean removeSchedulableEntity(S s) {
    return getPartitionOrderingPolicy(s.getPartition())
        .removeSchedulableEntity(s);
  }

  public void addAllSchedulableEntities(Collection<S> sc) {
    for (S entity : sc) {
      getPartitionOrderingPolicy(entity.getPartition())
          .addSchedulableEntity(entity);
    }
  }

  public int getNumSchedulableEntities() {
    // Return total number of schedulable entities, to maintain parity with
    // existing FifoOrderingPolicy e.g. when determining if queue has reached
    // its max app limit
    int ret = 0;
    for (Map.Entry<String, OrderingPolicy<S>> entry
        : orderingPolicies.entrySet()) {
      ret += entry.getValue().getNumSchedulableEntities();
    }
    return ret;
  }

  public void containerAllocated(S schedulableEntity, RMContainer r) {
    getPartitionOrderingPolicy(schedulableEntity.getPartition())
        .containerAllocated(schedulableEntity, r);
  }

  public void containerReleased(S schedulableEntity, RMContainer r) {
    getPartitionOrderingPolicy(schedulableEntity.getPartition())
        .containerReleased(schedulableEntity, r);
  }

  public void demandUpdated(S schedulableEntity) {
    getPartitionOrderingPolicy(schedulableEntity.getPartition())
        .demandUpdated(schedulableEntity);
  }

  @Override
  public void configure(Map<String, String> conf) {
    if (conf == null) {
      return;
    }
    String partitions =
        conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX);
    if (partitions != null) {
      for (String partition : partitions.split(",")) {
        partition = partition.trim();
        if (!partition.isEmpty()) {
          this.orderingPolicies.put(partition, new FifoOrderingPolicy());
        }
      }
    }
  }

  @Override
  public String getInfo() {
    return "FifoOrderingPolicyWithExclusivePartitions";
  }

  @Override
  public String getConfigName() {
    return CapacitySchedulerConfiguration
        .FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY;
  }

  private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
    String keyPartition = orderingPolicies.containsKey(partition) ?
        partition : DEFAULT_PARTITION;
    return orderingPolicies.get(keyPartition);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractComparatorOrderingPolicy 源码

hadoop CompoundComparator 源码

hadoop FairOrderingPolicy 源码

hadoop FifoComparator 源码

hadoop FifoOrderingPolicy 源码

hadoop FifoOrderingPolicyForPendingApps 源码

hadoop IteratorSelector 源码

hadoop OrderingPolicy 源码

hadoop PriorityComparator 源码

hadoop RecoveryComparator 源码

0  赞