hadoop AllocationTagsManager 源码

  • 2022-10-20
  • 浏览 (194)

haddop AllocationTagsManager 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java

/*
 * *
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * /
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.log4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongBinaryOperator;

/**
 * In-memory mapping between applications/container-tags and nodes/racks.
 * Required by constrained affinity/anti-affinity and cardinality placement.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AllocationTagsManager {

  private static final Logger LOG = Logger.getLogger(
      AllocationTagsManager.class);

  private ReentrantReadWriteLock.ReadLock readLock;
  private ReentrantReadWriteLock.WriteLock writeLock;
  private final RMContext rmContext;

  // Application's tags to Node
  private Map<ApplicationId, TypeToCountedTags> perAppNodeMappings =
      new HashMap<>();
  // Application's tags to Rack
  private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
      new HashMap<>();

  // Global tags to node mapping (used to fast return aggregated tags
  // cardinality across apps)
  private TypeToCountedTags<NodeId> globalNodeMapping = new TypeToCountedTags();
  // Global tags to Rack mapping
  private TypeToCountedTags<String> globalRackMapping = new TypeToCountedTags();

  /**
   * Generic store mapping type T to counted tags.
   * Currently used both for NodeId to Tag, Count and Rack to Tag, Count
   */
  @VisibleForTesting
  public static class TypeToCountedTags<T> {
    // Map<Type, Map<Tag, Count>>
    private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();

    public TypeToCountedTags() {}

    private TypeToCountedTags(Map<T, Map<String, Long>> tags) {
      this.typeToTagsWithCount = tags;
    }

    // protected by external locks
    private void addTags(T type, Set<String> tags) {
      Map<String, Long> innerMap =
          typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());

      for (String tag : tags) {
        Long count = innerMap.get(tag);
        if (count == null) {
          innerMap.put(tag, 1L);
        } else {
          innerMap.put(tag, count + 1);
        }
      }
    }

    // protected by external locks
    private void addTag(T type, String tag) {
      Map<String, Long> innerMap =
          typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());

      Long count = innerMap.get(tag);
      if (count == null) {
        innerMap.put(tag, 1L);
      } else {
        innerMap.put(tag, count + 1);
      }
    }

    private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
      Long count = innerMap.get(tag);
      if (count == null) {
        LOG.warn("Trying to remove tags, however the tag " + tag
            + " no longer exists on this node/rack.");
        return;
      }
      if (count > 1) {
        innerMap.put(tag, count - 1);
      } else {
        if (count <= 0) {
          LOG.warn(
              "Trying to remove tags from node/rack, however the count already"
                  + " becomes 0 or less, it could be a potential bug.");
        }
        innerMap.remove(tag);
      }
    }

    private void removeTags(T type, Set<String> tags) {
      Map<String, Long> innerMap = typeToTagsWithCount.get(type);
      if (innerMap == null) {
        LOG.warn("Failed to find node/rack=" + type
            + " while trying to remove tags, please double check.");
        return;
      }

      for (String tag : tags) {
        removeTagFromInnerMap(innerMap, tag);
      }

      if (innerMap.isEmpty()) {
        typeToTagsWithCount.remove(type);
      }
    }

    private void removeTag(T type, String tag) {
      Map<String, Long> innerMap = typeToTagsWithCount.get(type);
      if (innerMap == null) {
        LOG.warn("Failed to find node/rack=" + type
            + " while trying to remove tags, please double check.");
        return;
      }

      removeTagFromInnerMap(innerMap, tag);

      if (innerMap.isEmpty()) {
        typeToTagsWithCount.remove(type);
      }
    }

    private long getCardinality(T type, String tag) {
      Map<String, Long> innerMap = typeToTagsWithCount.get(type);
      if (innerMap == null) {
        return 0;
      }
      Long value = innerMap.get(tag);
      return value == null ? 0 : value;
    }

    private long getCardinality(T type, Set<String> tags,
        LongBinaryOperator op) {
      Map<String, Long> innerMap = typeToTagsWithCount.get(type);
      if (innerMap == null) {
        return 0;
      }

      long returnValue = 0;
      boolean firstTag = true;

      if (tags != null && !tags.isEmpty()) {
        for (String tag : tags) {
          Long value = innerMap.get(tag);
          if (value == null) {
            value = 0L;
          }

          if (firstTag) {
            returnValue = value;
            firstTag = false;
            continue;
          }

          returnValue = op.applyAsLong(returnValue, value);
        }
      } else {
        // Similar to above if, but only iterate values for better performance
        for (long value : innerMap.values()) {
          // For the first value, we will not apply op
          if (firstTag) {
            returnValue = value;
            firstTag = false;
            continue;
          }
          returnValue = op.applyAsLong(returnValue, value);
        }
      }
      return returnValue;
    }

    private boolean isEmpty() {
      return typeToTagsWithCount.isEmpty();
    }

    @VisibleForTesting
    public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
      return typeToTagsWithCount;
    }

    /**
     * Absorbs the given {@link TypeToCountedTags} to current mapping,
     * this will aggregate the count of the tags with same name.
     *
     * @param target a {@link TypeToCountedTags} to merge with.
     */
    protected void absorb(final TypeToCountedTags<T> target) {
      // No opt if the given target is null.
      if (target == null || target.getTypeToTagsWithCount() == null) {
        return;
      }

      // Merge the target.
      Map<T, Map<String, Long>> targetMap = target.getTypeToTagsWithCount();
      for (Map.Entry<T, Map<String, Long>> targetEntry :
          targetMap.entrySet()) {
        // Get a mutable copy, do not modify the target reference.
        Map<String, Long> copy = Maps.newHashMap(targetEntry.getValue());

        // If the target type doesn't exist in the current mapping,
        // add as a new entry.
        Map<String, Long> existingMapping =
            this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy);
        // There was a mapping for this target type,
        // do proper merging on the operator.
        if (existingMapping != null) {
          Map<String, Long> localMap =
              this.typeToTagsWithCount.get(targetEntry.getKey());
          // Merge the target map to the inner map.
          Map<String, Long> targetValue = targetEntry.getValue();
          for (Map.Entry<String, Long> entry : targetValue.entrySet()) {
            localMap.merge(entry.getKey(), entry.getValue(),
                (a, b) -> Long.sum(a, b));
          }
        }
      }
    }

    /**
     * @return an immutable copy of current instance.
     */
    protected TypeToCountedTags immutableCopy() {
      return new TypeToCountedTags(
          Collections.unmodifiableMap(this.typeToTagsWithCount));
    }
  }

  @VisibleForTesting
  public Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
    return perAppNodeMappings;
  }

  @VisibleForTesting
  Map<ApplicationId, TypeToCountedTags> getPerAppRackMappings() {
    return perAppRackMappings;
  }

  @VisibleForTesting
  TypeToCountedTags getGlobalNodeMapping() {
    return globalNodeMapping;
  }

  @VisibleForTesting
  TypeToCountedTags getGlobalRackMapping() {
    return globalRackMapping;
  }

  public AllocationTagsManager(RMContext context) {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    readLock = lock.readLock();
    writeLock = lock.writeLock();
    rmContext = context;
  }

  /**
   * Aggregates multiple {@link TypeToCountedTags} to a single one based on
   * the scope defined in the allocation tags, the values are properly merged.
   *
   * @param allocationTags {@link AllocationTags}.
   * @return an aggregated {@link TypeToCountedTags}.
   */
  private TypeToCountedTags aggregateAllocationTags(
      AllocationTags allocationTags,
      Map<ApplicationId, TypeToCountedTags> mapping)
      throws InvalidAllocationTagsQueryException {
    // Based on the namespace type of the given allocation tags
    TargetApplicationsNamespace namespace = allocationTags.getNamespace();
    TargetApplications ta = new TargetApplications(
        allocationTags.getCurrentApplicationId(), getApplicationIdToTags());
    namespace.evaluate(ta);
    Set<ApplicationId> appIds = namespace.getNamespaceScope();
    TypeToCountedTags result = new TypeToCountedTags();
    if (appIds != null) {
      if (appIds.size() == 1) {
        // If there is only one app, we simply return the mapping
        // without any extra computation.
        return mapping.get(appIds.iterator().next());
      }

      for (ApplicationId applicationId : appIds) {
        TypeToCountedTags appIdTags = mapping.get(applicationId);
        if (appIdTags != null) {
          // Make sure ATM state won't be changed.
          result.absorb(appIdTags.immutableCopy());
        }
      }
    }
    return result;
  }

  /**
   * Notify container allocated on a node.
   *
   * @param nodeId         allocated node.
   * @param containerId    container id.
   * @param allocationTags allocation tags, see
   *                       {@link SchedulingRequest#getAllocationTags()}
   *                       application_id will be added to allocationTags.
   */
  @SuppressWarnings("unchecked")
  public void addContainer(NodeId nodeId, ContainerId containerId,
      Set<String> allocationTags) {
    // Do nothing for empty allocation tags.
    if (allocationTags == null || allocationTags.isEmpty()) {
      return;
    }
    ApplicationId applicationId =
        containerId.getApplicationAttemptId().getApplicationId();
    addTags(nodeId, applicationId, allocationTags);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Added container=" + containerId + " with tags=["
          + StringUtils.join(allocationTags, ",") + "]");
    }
  }

  public void addTags(NodeId nodeId, ApplicationId applicationId,
      Set<String> allocationTags) {
    writeLock.lock();
    try {
      TypeToCountedTags perAppTagsMapping = perAppNodeMappings
          .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
      TypeToCountedTags perAppRackTagsMapping = perAppRackMappings
          .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
      // Covering test-cases where context is mocked
      String nodeRack = (rmContext.getRMNodes() != null
          && rmContext.getRMNodes().get(nodeId) != null)
              ? rmContext.getRMNodes().get(nodeId).getRackName() :
          "default-rack";
      perAppTagsMapping.addTags(nodeId, allocationTags);
      perAppRackTagsMapping.addTags(nodeRack, allocationTags);
      globalNodeMapping.addTags(nodeId, allocationTags);
      globalRackMapping.addTags(nodeRack, allocationTags);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Notify container removed.
   *
   * @param nodeId         nodeId
   * @param containerId    containerId.
   * @param allocationTags allocation tags for given container
   */
  @SuppressWarnings("unchecked")
  public void removeContainer(NodeId nodeId,
      ContainerId containerId, Set<String> allocationTags) {
    // Do nothing for empty allocation tags.
    if (allocationTags == null || allocationTags.isEmpty()) {
      return;
    }
    ApplicationId applicationId =
        containerId.getApplicationAttemptId().getApplicationId();

    removeTags(nodeId, applicationId, allocationTags);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Removed container=" + containerId + " with tags=["
          + StringUtils.join(allocationTags, ",") + "]");
    }
  }

  /**
   * Helper method to just remove the tags associated with a container.
   * @param nodeId
   * @param applicationId
   * @param allocationTags
   */
  public void removeTags(NodeId nodeId, ApplicationId applicationId,
      Set<String> allocationTags) {
    writeLock.lock();
    try {
      TypeToCountedTags perAppTagsMapping =
          perAppNodeMappings.get(applicationId);
      TypeToCountedTags perAppRackTagsMapping =
          perAppRackMappings.get(applicationId);
      if (perAppTagsMapping == null) {
        return;
      }
      // Covering test-cases where context is mocked
      String nodeRack = (rmContext.getRMNodes() != null
          && rmContext.getRMNodes().get(nodeId) != null)
              ? rmContext.getRMNodes().get(nodeId).getRackName() :
          "default-rack";
      perAppTagsMapping.removeTags(nodeId, allocationTags);
      perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
      globalNodeMapping.removeTags(nodeId, allocationTags);
      globalRackMapping.removeTags(nodeRack, allocationTags);

      if (perAppTagsMapping.isEmpty()) {
        perAppNodeMappings.remove(applicationId);
      }
      if (perAppRackTagsMapping.isEmpty()) {
        perAppRackMappings.remove(applicationId);
      }
    } finally {
      writeLock.unlock();
    }
  }


  /**
   * Get Node cardinality for a specific tag.
   * When applicationId is null, method returns aggregated cardinality
   *
   * @param nodeId        nodeId, required.
   * @param applicationId applicationId. When null is specified, return
   *                      aggregated cardinality among all nodes.
   * @param tag           allocation tag, see
   *                      {@link SchedulingRequest#getAllocationTags()},
   *                      If a specified tag doesn't exist,
   *                      method returns 0.
   * @return cardinality of specified query on the node.
   * @throws InvalidAllocationTagsQueryException when illegal query
   *                                            parameter specified
   */
  public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
      String tag) throws InvalidAllocationTagsQueryException {
    readLock.lock();

    try {
      if (nodeId == null) {
        throw new InvalidAllocationTagsQueryException(
            "Must specify nodeId/tag to query cardinality");
      }

      TypeToCountedTags mapping;
      if (applicationId != null) {
        mapping = perAppNodeMappings.get(applicationId);
      } else {
        mapping = globalNodeMapping;
      }

      if (mapping == null) {
        return 0;
      }

      return mapping.getCardinality(nodeId, tag);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get Rack cardinality for a specific tag.
   *
   * @param rack          rack, required.
   * @param applicationId applicationId. When null is specified, return
   *                      aggregated cardinality among all nodes.
   * @param tag           allocation tag, see
   *                      {@link SchedulingRequest#getAllocationTags()},
   *                      If a specified tag doesn't exist,
   *                      method returns 0.
   * @return cardinality of specified query on the rack.
   * @throws InvalidAllocationTagsQueryException when illegal query
   *                                            parameter specified
   */
  public long getRackCardinality(String rack, ApplicationId applicationId,
      String tag) throws InvalidAllocationTagsQueryException {
    readLock.lock();

    try {
      if (rack == null) {
        throw new InvalidAllocationTagsQueryException(
            "Must specify rack/tag to query cardinality");
      }

      TypeToCountedTags mapping;
      if (applicationId != null) {
        mapping = perAppRackMappings.get(applicationId);
      } else {
        mapping = globalRackMapping;
      }

      if (mapping == null) {
        return 0;
      }

      return mapping.getCardinality(rack, tag);
    } finally {
      readLock.unlock();
    }
  }



  /**
   * Check if given tag exists on node.
   *
   * @param nodeId        nodeId, required.
   * @param applicationId applicationId. When null is specified, return
   *                      aggregation among all applications.
   * @param tag           allocation tag, see
   *                      {@link SchedulingRequest#getAllocationTags()},
   *                      When multiple tags specified. Returns cardinality
   *                      depends on op. If a specified tag doesn't exist,
   *                      0 will be its cardinality.
   *                      When null/empty tags specified, all tags
   *                      (of the node/app) will be considered.
   * @return cardinality of specified query on the node.
   * @throws InvalidAllocationTagsQueryException when illegal query
   *                                            parameter specified
   */
  public boolean allocationTagExistsOnNode(NodeId nodeId,
      ApplicationId applicationId, String tag)
      throws InvalidAllocationTagsQueryException {
    return getNodeCardinality(nodeId, applicationId, tag) > 0;
  }

  /**
   * Get cardinality for following conditions. External can pass-in a binary op
   * to implement customized logic.
   *
   * @param nodeId        nodeId, required.
   * @param tags          {@link AllocationTags}, allocation tags under a
   *                      specific namespace. See
   *                      {@link SchedulingRequest#getAllocationTags()},
   *                      When multiple tags specified. Returns cardinality
   *                      depends on op. If a specified tag doesn't exist, 0
   *                      will be its cardinality. When null/empty tags
   *                      specified, all tags (of the node/app) will be
   *                      considered.
   * @param op            operator. Such as Long::max, Long::sum, etc. Required.
   *                      This parameter only take effect when #values greater
   *                      than 2.
   * @return cardinality of specified query on the node.
   * @throws InvalidAllocationTagsQueryException when illegal query
   *                                            parameter specified
   */
  public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
    readLock.lock();
    try {
      if (nodeId == null || op == null || tags == null) {
        throw new InvalidAllocationTagsQueryException(
            "Must specify nodeId/tags/op to query cardinality");
      }

      TypeToCountedTags mapping;
      if (AllocationTagNamespaceType.ALL.equals(
          tags.getNamespace().getNamespaceType())) {
        mapping = globalNodeMapping;
      } else {
        // Aggregate app tags cardinality by applications.
        mapping = aggregateAllocationTags(tags, perAppNodeMappings);
      }

      return mapping == null ? 0 :
          mapping.getCardinality(nodeId, tags.getTags(), op);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get cardinality for following conditions. External can pass-in a binary op
   * to implement customized logic.
   *
   * @param rack          rack, required.
   * @param tags          {@link AllocationTags}, allocation tags under a
   *                      specific namespace. See
   *                      {@link SchedulingRequest#getAllocationTags()},
   *                      When multiple tags specified. Returns cardinality
   *                      depends on op. If a specified tag doesn't exist, 0
   *                      will be its cardinality. When null/empty tags
   *                      specified, all tags (of the rack/app) will be
   *                      considered.
   * @param op            operator. Such as Long::max, Long::sum, etc. Required.
   *                      This parameter only take effect when #values
   *                      greater than 2.
   * @return cardinality of specified query on the rack.
   * @throws InvalidAllocationTagsQueryException when illegal query
   *                                            parameter specified
   */
  public long getRackCardinalityByOp(String rack, AllocationTags tags,
      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
    readLock.lock();
    try {
      if (rack == null || op == null || tags == null) {
        throw new InvalidAllocationTagsQueryException(
            "Must specify nodeId/tags/op to query cardinality");
      }

      TypeToCountedTags mapping;
      if (AllocationTagNamespaceType.ALL.equals(
          tags.getNamespace().getNamespaceType())) {
        mapping = globalRackMapping;
      } else {
        // Aggregates cardinality by rack.
        mapping = aggregateAllocationTags(tags, perAppRackMappings);
      }

      return mapping == null ? 0 :
          mapping.getCardinality(rack, tags.getTags(), op);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Returns a map whose key is the allocation tag and value is the
   * count of allocations with this tag.
   *
   * @param nodeId
   * @return allocation tag to count mapping
   */
  public Map<String, Long> getAllocationTagsWithCount(NodeId nodeId) {
    return globalNodeMapping.getTypeToTagsWithCount().get(nodeId);
  }

  /**
   * @return all applications that is known to the
   * {@link AllocationTagsManager}, along with their application tags.
   * The result is a map, where key is an application ID, and value is the
   * application-tags attached to this application. If there is no
   * application-tag exists for the application, the value is an empty set.
   */
  private Map<ApplicationId, Set<String>> getApplicationIdToTags() {
    Map<ApplicationId, Set<String>> result = new HashMap<>();
    ConcurrentMap<ApplicationId, RMApp> allApps = rmContext.getRMApps();
    if (allApps != null) {
      for (Map.Entry<ApplicationId, RMApp> app : allApps.entrySet()) {
        if (perAppNodeMappings.containsKey(app.getKey())) {
          result.put(app.getKey(), app.getValue().getApplicationTags());
        }
      }
    }
    return result;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocationTags 源码

hadoop Evaluable 源码

hadoop InvalidAllocationTagsQueryException 源码

hadoop MemoryPlacementConstraintManager 源码

hadoop PlacementConstraintManager 源码

hadoop PlacementConstraintManagerService 源码

hadoop PlacementConstraintsUtil 源码

hadoop TargetApplications 源码

hadoop TargetApplicationsNamespace 源码

hadoop package-info 源码

0  赞