hadoop CapacitySchedulerConfigValidator 源码

  • 2022-10-20
  • 浏览 (206)

haddop CapacitySchedulerConfigValidator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;

public final class CapacitySchedulerConfigValidator {
  private static final Logger LOG = LoggerFactory.getLogger(
          CapacitySchedulerConfigValidator.class);

  private CapacitySchedulerConfigValidator() {
    throw new IllegalStateException("Utility class");
  }

  public static boolean validateCSConfiguration(
          final Configuration oldConf, final Configuration newConf,
          final RMContext rmContext) throws IOException {
    CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler();
    CapacityScheduler newCs = new CapacityScheduler();
    try {
      //TODO: extract all the validation steps and replace reinitialize with
      //the specific validation steps
      newCs.setConf(oldConf);
      newCs.setRMContext(rmContext);
      newCs.init(oldConf);
      newCs.addNodes(liveScheduler.getAllNodes());
      newCs.reinitialize(newConf, rmContext, true);
      return true;
    } finally {
      newCs.stop();
      QueueMetrics.clearQueueMetrics();
      liveScheduler.resetSchedulerMetrics();
    }
  }

  public static Set<String> validatePlacementRules(
          Collection<String> placementRuleStrs) throws IOException {
    Set<String> distinguishRuleSet = new LinkedHashSet<>();
    // fail the case if we get duplicate placementRule add in
    for (String pls : placementRuleStrs) {
      if (!distinguishRuleSet.add(pls)) {
        throw new IOException("Invalid PlacementRule inputs which "
                + "contains duplicate rule strings");
      }
    }
    return distinguishRuleSet;
  }

  public static void validateMemoryAllocation(Configuration conf) {
    int minMem = conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    int maxMem = conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);

    if (minMem <= 0 || minMem > maxMem) {
      throw new YarnRuntimeException("Invalid resource scheduler memory"
              + " allocation configuration"
              + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
              + "=" + minMem
              + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
              + "=" + maxMem + ", min and max should be greater than 0"
              + ", max should be no smaller than min.");
    }
  }
  public static void validateVCores(Configuration conf) {
    int minVcores = conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
    int maxVcores = conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);

    if (minVcores <= 0 || minVcores > maxVcores) {
      throw new YarnRuntimeException("Invalid resource scheduler vcores"
              + " allocation configuration"
              + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
              + "=" + minVcores
              + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
              + "=" + maxVcores + ", min and max should be greater than 0"
              + ", max should be no smaller than min.");
    }
  }

  /**
   * Ensure all existing queues are present. Queues cannot be deleted if it's not
   * in Stopped state, Queue's cannot be moved from one hierarchy to other also.
   * Previous child queue could be converted into parent queue if it is in
   * STOPPED state.
   *
   * @param queues existing queues
   * @param newQueues new queues
   */
  public static void validateQueueHierarchy(
      CSQueueStore queues,
      CSQueueStore newQueues,
      CapacitySchedulerConfiguration newConf) throws IOException {
    // check that all static queues are included in the newQueues list
    for (CSQueue oldQueue : queues.getQueues()) {
      if (AbstractAutoCreatedLeafQueue.class.isAssignableFrom(oldQueue.getClass())) {
        continue;
      }

      final String queuePath = oldQueue.getQueuePath();
      final String configPrefix = CapacitySchedulerConfiguration.getQueuePrefix(
          oldQueue.getQueuePath());
      final QueueState newQueueState = createQueueState(newConf.get(configPrefix + "state"),
          queuePath);
      final CSQueue newQueue = newQueues.get(queuePath);

      if (null == newQueue) {
        // old queue doesn't exist in the new XML
        if (isEitherQueueStopped(oldQueue.getState(), newQueueState)) {
          LOG.info("Deleting Queue {}, as it is not present in the modified capacity " +
              "configuration xml", queuePath);
        } else {
          if (!isDynamicQueue(oldQueue)) {
            throw new IOException(oldQueue.getQueuePath() + " cannot be"
                + " deleted from the capacity scheduler configuration, as the"
                + " queue is not yet in stopped state. Current State : "
                + oldQueue.getState());
          }
        }
      } else {
        validateSameQueuePath(oldQueue, newQueue);
        validateParentQueueConversion(oldQueue, newQueue);
        validateLeafQueueConversion(oldQueue, newQueue);
      }
    }
  }

  private static void validateSameQueuePath(CSQueue oldQueue, CSQueue newQueue) throws IOException {
    if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
      // Queues cannot be moved from one hierarchy to another
      throw new IOException(
          oldQueue.getQueuePath() + " is moved from:" + oldQueue.getQueuePath() + " to:"
              + newQueue.getQueuePath()
              + " after refresh, which is not allowed.");
    }
  }

  private static void validateParentQueueConversion(CSQueue oldQueue,
                                                    CSQueue newQueue) throws IOException {
    if (oldQueue instanceof ParentQueue) {
      if (!(oldQueue instanceof ManagedParentQueue) && newQueue instanceof ManagedParentQueue) {
        throw new IOException(
            "Can not convert parent queue: " + oldQueue.getQueuePath()
                + " to auto create enabled parent queue since "
                + "it could have other pre-configured queues which is not "
                + "supported");
      }

      if (oldQueue instanceof ManagedParentQueue
          && !(newQueue instanceof ManagedParentQueue)) {
        throw new IOException(
            "Cannot convert auto create enabled parent queue: "
                + oldQueue.getQueuePath() + " to leaf queue. Please check "
                + " parent queue's configuration "
                + CapacitySchedulerConfiguration.AUTO_CREATE_CHILD_QUEUE_ENABLED
                + " is set to true");
      }

      if (newQueue instanceof AbstractLeafQueue) {
        LOG.info("Converting the parent queue: {} to leaf queue.", oldQueue.getQueuePath());
      }
    }
  }

  private static void validateLeafQueueConversion(CSQueue oldQueue,
                                                  CSQueue newQueue) throws IOException {
    if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof ParentQueue) {
      if (isEitherQueueStopped(oldQueue.getState(), newQueue.getState())) {
        LOG.info("Converting the leaf queue: {} to parent queue.", oldQueue.getQueuePath());
      } else {
        throw new IOException(
            "Can not convert the leaf queue: " + oldQueue.getQueuePath()
                + " to parent queue since "
                + "it is not yet in stopped state. Current State : "
                + oldQueue.getState());
      }
    }
  }

  private static QueueState createQueueState(String state, String queuePath) {
    if (state != null) {
      try {
        return QueueState.valueOf(state);
      } catch (Exception ex) {
        LOG.warn("Not a valid queue state for queue: {}, state: {}", queuePath, state);
      }
    }
    return null;
  }

  private static boolean isDynamicQueue(CSQueue csQueue) {
    return ((AbstractCSQueue)csQueue).isDynamicQueue();
  }

  private static boolean isEitherQueueStopped(QueueState a, QueueState b) {
    return a == QueueState.STOPPED || b == QueueState.STOPPED;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractAutoCreatedLeafQueue 源码

hadoop AbstractCSQueue 源码

hadoop AbstractLeafQueue 源码

hadoop AbstractManagedParentQueue 源码

hadoop AppPriorityACLConfigurationParser 源码

hadoop AppPriorityACLGroup 源码

hadoop AutoCreatedLeafQueue 源码

hadoop AutoCreatedLeafQueueConfig 源码

hadoop AutoCreatedQueueDeletionPolicy 源码

hadoop AutoCreatedQueueManagementPolicy 源码

0  赞