hadoop WeightToPercentConverter 源码

  • 2022-10-20
  • 浏览 (195)

haddop WeightToPercentConverter 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/weightconversion/WeightToPercentConverter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;

public class WeightToPercentConverter
    implements CapacityConverter {

  private static final BigDecimal HUNDRED = new BigDecimal(100).setScale(3);
  private static final BigDecimal ZERO = new BigDecimal(0).setScale(3);

  @Override
  public void convertWeightsForChildQueues(FSQueue queue,
      Configuration csConfig) {
    List<FSQueue> children = queue.getChildQueues();

    int totalWeight = getTotalWeight(children);
    Pair<Map<String, BigDecimal>, Boolean> result =
        getCapacities(totalWeight, children);

    Map<String, BigDecimal> capacities = result.getLeft();
    boolean shouldAllowZeroSumCapacity = result.getRight();

    capacities
        .forEach((key, value) -> csConfig.set(PREFIX + key +
                ".capacity", value.toString()));

    if (shouldAllowZeroSumCapacity) {
      String queueName = queue.getName();
      csConfig.setBoolean(
          PREFIX + queueName + ".allow-zero-capacity-sum", true);
    }
  }

  private Pair<Map<String, BigDecimal>, Boolean> getCapacities(int totalWeight,
      List<FSQueue> children) {

    if (children.size() == 0) {
      return Pair.of(new HashMap<>(), false);
    } else if (children.size() == 1) {
      Map<String, BigDecimal> capacity = new HashMap<>();
      String queueName = children.get(0).getName();
      capacity.put(queueName, HUNDRED);

      return Pair.of(capacity, false);
    } else {
      Map<String, BigDecimal> capacities = new HashMap<>();

      children
          .stream()
          .forEach(queue -> {
            BigDecimal pct;

            if (totalWeight == 0) {
              pct = ZERO;
            } else {
              BigDecimal total = new BigDecimal(totalWeight);
              BigDecimal weight = new BigDecimal(queue.getWeight());
              pct = weight
                  .setScale(5)
                  .divide(total, RoundingMode.HALF_UP)
                  .multiply(HUNDRED)
                  .setScale(3);
            }

            capacities.put(queue.getName(), pct);
          });

      BigDecimal totalPct = ZERO;
      for (Map.Entry<String, BigDecimal> entry : capacities.entrySet()) {
        totalPct = totalPct.add(entry.getValue());
      }

      // fix capacities if total != 100.000
      boolean shouldAllowZeroSumCapacity = false;
      if (!totalPct.equals(HUNDRED)) {
        shouldAllowZeroSumCapacity = fixCapacities(capacities, totalPct);
      }

      return Pair.of(capacities, shouldAllowZeroSumCapacity);
    }
  }

  @VisibleForTesting
  boolean fixCapacities(Map<String, BigDecimal> capacities,
      BigDecimal totalPct) {
    boolean shouldAllowZeroSumCapacity = false;

    // Sort the list so we'll adjust the highest capacity value,
    // because that will affected less by a small change.
    // Also, it's legal to have weight = 0 and we have to avoid picking
    // that value as well.
    List<Map.Entry<String, BigDecimal>> sortedEntries = capacities
        .entrySet()
        .stream()
        .sorted(new Comparator<Map.Entry<String, BigDecimal>>() {
          @Override
          public int compare(Map.Entry<String, BigDecimal> e1,
              Map.Entry<String, BigDecimal> e2) {
            return e2.getValue().compareTo(e1.getValue());
          }
        })
        .collect(Collectors.toList());

    String highestCapacityQueue = sortedEntries.get(0).getKey();
    BigDecimal highestCapacity = sortedEntries.get(0).getValue();

    if (highestCapacity.equals(ZERO)) {
      // need to set allow-zero-capacity-sum on this queue
      // because we have zero weights on this level
      shouldAllowZeroSumCapacity = true;
    } else {
      BigDecimal diff = HUNDRED.subtract(totalPct);
      BigDecimal correctedHighest = highestCapacity.add(diff);
      capacities.put(highestCapacityQueue, correctedHighest);
    }

    return shouldAllowZeroSumCapacity;
  }

  private int getTotalWeight(List<FSQueue> children) {
    double sum = children
                  .stream()
                  .mapToDouble(c -> c.getWeight())
                  .sum();
    return (int) sum;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CapacityConverter 源码

hadoop CapacityConverterFactory 源码

hadoop WeightToWeightConverter 源码

0  赞