hadoop TargetApplicationsNamespace 源码

  • 2022-10-20
  • 浏览 (205)

haddop TargetApplicationsNamespace 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplicationsNamespace.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;

import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_TAG;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;

/**
 * Class to describe the namespace of allocation tags, used by
 * {@link AllocationTags}. Each namespace can be evaluated against
 * a target set applications, represented by {@link TargetApplications}.
 * After evaluation, the namespace is interpreted to be a set of
 * applications based on the namespace type.
 */
public abstract class TargetApplicationsNamespace implements
    Evaluable<TargetApplications> {

  public final static String NAMESPACE_DELIMITER = "/";

  private AllocationTagNamespaceType nsType;
  // Namespace scope value will be delay binding by eval method.
  private Set<ApplicationId> nsScope;

  public TargetApplicationsNamespace(AllocationTagNamespaceType
      allocationTagNamespaceType) {
    this.nsType = allocationTagNamespaceType;
  }

  protected void setScopeIfNotNull(Set<ApplicationId> appIds) {
    if (appIds != null) {
      this.nsScope = appIds;
    }
  }

  /**
   * Get the type of the namespace.
   * @return namespace type.
   */
  public AllocationTagNamespaceType getNamespaceType() {
    return nsType;
  }

  /**
   * Get the scope of the namespace, in form of a set of applications.
   *
   * @return a set of applications.
   */
  public Set<ApplicationId> getNamespaceScope() {
    if (this.nsScope == null) {
      throw new IllegalStateException("Invalid namespace scope,"
          + " it is not initialized. Evaluate must be called before"
          + " a namespace can be consumed.");
    }
    return this.nsScope;
  }

  /**
   * Evaluate the namespace against given target applications
   * if it is necessary. Only self/not-self/app-label namespace types
   * require this evaluation step, because they are not binding to a
   * specific scope during initiating. So we do lazy binding for them
   * in this method.
   *
   * @param target a generic type target that impacts this evaluation.
   * @throws InvalidAllocationTagsQueryException
   */
  @Override
  public void evaluate(TargetApplications target)
      throws InvalidAllocationTagsQueryException {
    // Sub-class needs to override this when it requires the eval step.
  }

  @Override
  public String toString() {
    return this.nsType.toString();
  }

  /**
   * Namespace within application itself.
   */
  public static class Self extends TargetApplicationsNamespace {

    public Self() {
      super(SELF);
    }

    @Override
    public void evaluate(TargetApplications target)
        throws InvalidAllocationTagsQueryException {
      if (target == null || target.getCurrentApplicationId() == null) {
        throw new InvalidAllocationTagsQueryException("Namespace Self must"
            + " be evaluated against a single application ID.");
      }
      ApplicationId applicationId = target.getCurrentApplicationId();
      setScopeIfNotNull(ImmutableSet.of(applicationId));
    }
  }

  /**
   * Namespace to all applications except itself.
   */
  public static class NotSelf extends TargetApplicationsNamespace {

    private ApplicationId applicationId;

    public NotSelf() {
      super(NOT_SELF);
    }

    /**
     * The scope of self namespace is to an application itself,
     * the application ID can be delay binding to the namespace.
     *
     * @param appId application ID.
     */
    public void setApplicationId(ApplicationId appId) {
      this.applicationId = appId;
    }

    public ApplicationId getApplicationId() {
      return this.applicationId;
    }

    @Override
    public void evaluate(TargetApplications target) {
      Set<ApplicationId> otherAppIds = target.getOtherApplicationIds();
      setScopeIfNotNull(otherAppIds);
    }
  }

  /**
   * Namespace to all applications in the cluster.
   */
  public static class All extends TargetApplicationsNamespace {

    public All() {
      super(ALL);
    }
  }

  /**
   * Namespace to applications that attached with a certain application tag.
   */
  public static class AppTag extends TargetApplicationsNamespace {

    private String applicationTag;

    public AppTag(String appTag) {
      super(APP_TAG);
      this.applicationTag = appTag;
    }

    @Override
    public void evaluate(TargetApplications target) {
      setScopeIfNotNull(target.getApplicationIdsByTag(applicationTag));
    }

    @Override
    public String toString() {
      return APP_TAG.toString() + NAMESPACE_DELIMITER + this.applicationTag;
    }
  }

  /**
   * Namespace defined by a certain application ID.
   */
  public static class AppID extends TargetApplicationsNamespace {

    private ApplicationId targetAppId;
    // app-id namespace requires an extra value of an application id.
    public AppID(ApplicationId applicationId) {
      super(APP_ID);
      this.targetAppId = applicationId;
      setScopeIfNotNull(ImmutableSet.of(targetAppId));
    }

    @Override
    public String toString() {
      return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId;
    }
  }

  /**
   * Parse namespace from a string. The string must be in legal format
   * defined by each {@link AllocationTagNamespaceType}.
   *
   * @param namespaceStr namespace string.
   * @return an instance of {@link TargetApplicationsNamespace}.
   * @throws InvalidAllocationTagsQueryException
   * if given string is not in valid format
   */
  public static TargetApplicationsNamespace parse(String namespaceStr)
      throws InvalidAllocationTagsQueryException {
    // Return the default namespace if no valid string is given.
    if (Strings.isNullOrEmpty(namespaceStr)) {
      return new Self();
    }

    // Normalize the input, escape additional chars.
    List<String> nsValues = normalize(namespaceStr);
    // The first string should be the prefix.
    String nsPrefix = nsValues.get(0);
    AllocationTagNamespaceType allocationTagNamespaceType =
        fromString(nsPrefix);
    switch (allocationTagNamespaceType) {
    case SELF:
      return new Self();
    case NOT_SELF:
      return new NotSelf();
    case ALL:
      return new All();
    case APP_ID:
      if (nsValues.size() != 2) {
        throw new InvalidAllocationTagsQueryException(
            "Missing the application ID in the namespace string: "
                + namespaceStr);
      }
      String appIDStr = nsValues.get(1);
      return parseAppID(appIDStr);
    case APP_TAG:
      if (nsValues.size() != 2) {
        throw new InvalidAllocationTagsQueryException(
            "Missing the application tag in the namespace string: "
                + namespaceStr);
      }
      return new AppTag(nsValues.get(1));
    default:
      throw new InvalidAllocationTagsQueryException(
          "Invalid namespace string " + namespaceStr);
    }
  }

  private static AllocationTagNamespaceType fromString(String prefix) throws
      InvalidAllocationTagsQueryException {
    for (AllocationTagNamespaceType type :
        AllocationTagNamespaceType.values()) {
      if(type.getTypeKeyword().equals(prefix)) {
        return type;
      }
    }

    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
        .map(AllocationTagNamespaceType::toString)
        .collect(Collectors.toSet());
    throw new InvalidAllocationTagsQueryException(
        "Invalid namespace prefix: " + prefix
            + ", valid values are: " + String.join(",", values));
  }

  private static TargetApplicationsNamespace parseAppID(String appIDStr)
      throws InvalidAllocationTagsQueryException {
    try {
      ApplicationId applicationId = ApplicationId.fromString(appIDStr);
      return new AppID(applicationId);
    } catch (IllegalArgumentException e) {
      throw new InvalidAllocationTagsQueryException(
          "Invalid application ID for "
              + APP_ID.getTypeKeyword() + ": " + appIDStr);
    }
  }

  /**
   * Valid given namespace string and parse it to a list of sub-strings
   * that can be consumed by the parser according to the type of the
   * namespace. Currently the size of return list should be either 1 or 2.
   * Extra slash is escaped during the normalization.
   *
   * @param namespaceStr namespace string.
   * @return a list of parsed strings.
   * @throws InvalidAllocationTagsQueryException
   * if namespace format is unexpected.
   */
  private static List<String> normalize(String namespaceStr)
      throws InvalidAllocationTagsQueryException {
    List<String> result = new ArrayList<>();
    if (namespaceStr == null) {
      return result;
    }

    String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
    for (String str : nsValues) {
      if (!Strings.isNullOrEmpty(str)) {
        result.add(str);
      }
    }

    // Currently we only allow 1 or 2 values for a namespace string
    if (result.size() == 0 || result.size() > 2) {
      throw new InvalidAllocationTagsQueryException("Invalid namespace string: "
          + namespaceStr + ", the syntax is <namespace_prefix> or"
          + " <namespace_prefix>/<namespace_value>");
    }

    return result;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocationTags 源码

hadoop AllocationTagsManager 源码

hadoop Evaluable 源码

hadoop InvalidAllocationTagsQueryException 源码

hadoop MemoryPlacementConstraintManager 源码

hadoop PlacementConstraintManager 源码

hadoop PlacementConstraintManagerService 源码

hadoop PlacementConstraintsUtil 源码

hadoop TargetApplications 源码

hadoop package-info 源码

0  赞