hadoop FSConfigToCSConfigRuleHandler 源码

  • 2022-10-20
  • 浏览 (225)

haddop FSConfigToCSConfigRuleHandler 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigRuleHandler.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static java.lang.String.format;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Class that determines what should happen if the FS->CS converter
 * encounters a property that is currently not supported.
 *
 * Acceptable values are either "abort" or "warning".
 */
public class FSConfigToCSConfigRuleHandler {
  private static final Logger LOG =
      LoggerFactory.getLogger(FSConfigToCSConfigRuleHandler.class);

  private ConversionOptions conversionOptions;

  public static final String MAX_CHILD_QUEUE_LIMIT =
      "maxChildQueue.limit";

  public static final String MAX_CAPACITY_PERCENTAGE =
      "maxCapacityPercentage.action";

  public static final String MAX_CHILD_CAPACITY =
      "maxChildCapacity.action";

  public static final String MAX_RESOURCES =
      "maxResources.action";

  public static final String MIN_RESOURCES =
      "minResources.action";

  public static final String DYNAMIC_MAX_ASSIGN =
      "dynamicMaxAssign.action";

  public static final String RESERVATION_SYSTEM =
      "reservationSystem.action";

  public static final String QUEUE_AUTO_CREATE =
      "queueAutoCreate.action";

  public static final String FAIR_AS_DRF =
      "fairAsDrf.action";

  public static final String QUEUE_DYNAMIC_CREATE =
      "queueDynamicCreate.action";

  public static final String PARENT_DYNAMIC_CREATE =
      "parentDynamicCreate.action";

  public static final String CHILD_STATIC_DYNAMIC_CONFLICT =
      "childStaticDynamicConflict.action";

  public static final String PARENT_CHILD_CREATE_DIFFERS =
      "parentChildCreateDiff.action";

  @VisibleForTesting
  enum RuleAction {
    WARNING,
    ABORT
  }

  private Map<String, RuleAction> actions;
  private Properties properties;

  void loadRulesFromFile(String ruleFile) throws IOException {
    if (ruleFile == null) {
      throw new IllegalArgumentException("Rule file cannot be null!");
    }

    properties = new Properties();
    try (InputStream is = new FileInputStream(new File(ruleFile))) {
      properties.load(is);
    }
    actions = new HashMap<>();
  }

  public FSConfigToCSConfigRuleHandler(ConversionOptions conversionOptions) {
    this.properties = new Properties();
    this.actions = new HashMap<>();
    this.conversionOptions = conversionOptions;
  }

  @VisibleForTesting
  FSConfigToCSConfigRuleHandler(Properties props,
      ConversionOptions conversionOptions) {
    this.properties = props;
    this.actions = new HashMap<>();
    this.conversionOptions = conversionOptions;
    initPropertyActions();
  }

  public void initPropertyActions() {
    setActionForProperty(MAX_CAPACITY_PERCENTAGE);
    setActionForProperty(MAX_CHILD_CAPACITY);
    setActionForProperty(MAX_RESOURCES);
    setActionForProperty(MIN_RESOURCES);
    setActionForProperty(DYNAMIC_MAX_ASSIGN);
    setActionForProperty(RESERVATION_SYSTEM);
    setActionForProperty(QUEUE_AUTO_CREATE);
    setActionForProperty(FAIR_AS_DRF);
    setActionForProperty(QUEUE_DYNAMIC_CREATE);
    setActionForProperty(PARENT_DYNAMIC_CREATE);
    setActionForProperty(CHILD_STATIC_DYNAMIC_CONFLICT);
    setActionForProperty(PARENT_CHILD_CREATE_DIFFERS);
  }

  public void handleMaxCapacityPercentage(String queueName) {
    handle(MAX_CAPACITY_PERCENTAGE, null,
        format("<maxResources> defined in percentages for queue %s",
            queueName));
  }

  public void handleMaxChildCapacity() {
    handle(MAX_CHILD_CAPACITY, "<maxChildResources>", null);
  }

  public void handleMaxResources() {
    handle(MAX_RESOURCES, "<maxResources>", null);
  }

  public void handleMinResources() {
    handle(MIN_RESOURCES, "<minResources>", null);
  }

  public void handleChildQueueCount(String queue, int count) {
    String value = properties.getProperty(MAX_CHILD_QUEUE_LIMIT);
    if (value != null) {
      if (StringUtils.isNumeric(value)) {
        int maxChildQueue = Integer.parseInt(value);
        if (count > maxChildQueue) {
          throw new ConversionException(
              format("Queue %s has too many children: %d", queue, count));
        }
      } else {
        throw new ConversionException(
            "Rule setting: maxChildQueue.limit is not an integer");
      }
    }
  }

  public void handleDynamicMaxAssign() {
    handle(DYNAMIC_MAX_ASSIGN,
        FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, null);
  }

  public void handleReservationSystem() {
    handle(RESERVATION_SYSTEM,
        null,
        "Conversion of reservation system is not supported");
  }

  public void handleFairAsDrf(String queueName) {
    handle(FAIR_AS_DRF,
        null,
        format(
            "Queue %s will use DRF policy instead of Fair",
            queueName));
  }

  public void handleRuleAutoCreateFlag(String queue) {
    String msg = format("Placement rules: create=true is enabled for"
        + " path %s - you have to make sure that these queues are"
        + " managed queues and set auto-create-child-queues=true."
        + " Other queues cannot statically exist under this path!", queue);

    handle(QUEUE_DYNAMIC_CREATE, null, msg);
  }

  public void handleFSParentCreateFlag(String parentPath) {
    String msg = format("Placement rules: create=true is enabled for parent"
        + " path %s - this is not supported in Capacity Scheduler."
        + " The parent must exist as a static queue and cannot be"
        + " created automatically", parentPath);

    handle(PARENT_DYNAMIC_CREATE, null, msg);
  }

  public void handleChildStaticDynamicConflict(String parentPath) {
    String msg = String.format("Placement rules: rule maps to"
        + " path %s, but this queue already contains static queue definitions!"
        + " This configuration is invalid and *must* be corrected", parentPath);

    handle(CHILD_STATIC_DYNAMIC_CONFLICT, null, msg);
  }

  public void handleFSParentAndChildCreateFlagDiff(Policy policy) {
    String msg = String.format("Placement rules: the policy %s originally uses"
        + " true/false or false/true \"create\" settings on the Fair Scheduler"
        + " side. This is not supported and create flag will be set"
        + " to *true* in the generated JSON rule chain", policy.name());

    handle(PARENT_CHILD_CREATE_DIFFERS, null, msg);
  }

  private void handle(String actionName, String fsSetting, String message) {
    RuleAction action = actions.get(actionName);

    if (action != null) {
      switch (action) {
      case ABORT:
        String exceptionMessage;
        if (message != null) {
          exceptionMessage = message;
        } else {
          exceptionMessage = format("Setting %s is not supported", fsSetting);
        }
        conversionOptions.handleError(exceptionMessage);
        break;
      case WARNING:
        String loggedMsg = (message != null) ? message :
            format("Setting %s is not supported, ignoring conversion",
                fsSetting);
        conversionOptions.handleWarning(loggedMsg, LOG);
        break;
      default:
        throw new IllegalArgumentException(
            "Unknown action " + action);
      }
    }
  }

  private void setActionForProperty(String property) {
    String action = properties.getProperty(property);

    if (action == null) {
      LOG.info("No rule set for {}, defaulting to WARNING", property);
      actions.put(property, RuleAction.WARNING);
    } else if (action.equalsIgnoreCase("warning")) {
      actions.put(property, RuleAction.WARNING);
    } else if (action.equalsIgnoreCase("abort")) {
      actions.put(property, RuleAction.ABORT);
    } else {
      LOG.warn("Unknown action {} set for rule {}, defaulting to WARNING",
          action, property);
      actions.put(property, RuleAction.WARNING);
    }
  }

  @VisibleForTesting
  public Map<String, RuleAction> getActions() {
    return actions;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConversionException 源码

hadoop ConversionOptions 源码

hadoop ConvertedConfigValidator 源码

hadoop DryRunResultHolder 源码

hadoop FSConfigToCSConfigArgumentHandler 源码

hadoop FSConfigToCSConfigConverter 源码

hadoop FSConfigToCSConfigConverterMain 源码

hadoop FSConfigToCSConfigConverterParams 源码

hadoop FSQueueConverter 源码

hadoop FSQueueConverterBuilder 源码

0  赞