hadoop LocalityRouterPolicy 源码

  • 2022-10-20
  • 浏览 (222)

haddop LocalityRouterPolicy 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collections;

import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This policy selects the subcluster depending on the node where the Client
 * wants to run its application.
 *
 * It succeeds if:
 *
 * - There are three AMContainerResourceRequests in the order
 *   NODE, RACK, ANY
 *
 * It falls back to WeightedRandomRouterPolicy in case of:
 *
 * - Null or empty AMContainerResourceRequests;
 *
 * - One AMContainerResourceRequests and it has ANY as ResourceName;
 *
 * - The node is in blacklisted SubClusters.
 *
 * It fails if:
 *
 * - The node does not exist and RelaxLocality is False;
 *
 * - We have an invalid number (not 0, 1 or 3) resource requests
 */
public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {

  public static final Logger LOG =
      LoggerFactory.getLogger(LocalityRouterPolicy.class);

  private SubClusterResolver resolver;
  private List<SubClusterId> enabledSCs;

  @Override
  public void reinitialize(FederationPolicyInitializationContext policyContext)
      throws FederationPolicyInitializationException {
    super.reinitialize(policyContext);
    resolver = policyContext.getFederationSubclusterResolver();
    Map<SubClusterIdInfo, Float> weights =
        getPolicyInfo().getRouterPolicyWeights();
    enabledSCs = new ArrayList<>();
    for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
      if (entry != null && entry.getValue() > 0) {
        enabledSCs.add(entry.getKey().toId());
      }
    }
  }

  @Override
  public SubClusterId getHomeSubcluster(
      ApplicationSubmissionContext appSubmissionContext,
      List<SubClusterId> blackListSubClusters) throws YarnException {

    // null checks and default-queue behavior
    validate(appSubmissionContext);

    List<ResourceRequest> rrList =
        appSubmissionContext.getAMContainerResourceRequests();

    // Fast path for FailForward to WeightedRandomRouterPolicy
    if (rrList == null || rrList.isEmpty() || (rrList.size() == 1
        && ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) {
      return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
    }

    if (rrList.size() != 3) {
      throw new FederationPolicyException(
          "Invalid number of resource requests: " + rrList.size());
    }

    Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
    Set<SubClusterId> validSubClusters = activeSubClusters.keySet();
    FederationPolicyUtils.validateSubClusterAvailability(activeSubClusters.keySet(),
        blackListSubClusters);

    if (blackListSubClusters != null) {
      // Remove from the active SubClusters from StateStore the blacklisted ones
      validSubClusters.removeAll(blackListSubClusters);
    }

    try {
      // With three requests, this has been processed by the
      // ResourceRequestInterceptorREST, and should have
      // node, rack, and any
      SubClusterId targetId = null;
      ResourceRequest nodeRequest = null;
      ResourceRequest rackRequest = null;
      ResourceRequest anyRequest = null;

      for (ResourceRequest rr : rrList) {
        // Handle "node" requests
        try {
          targetId = resolver.getSubClusterForNode(rr.getResourceName());
          nodeRequest = rr;
        } catch (YarnException e) {
          LOG.error("Cannot resolve node : {}.", e.getMessage());
        }
        // Handle "rack" requests
        try {
          resolver.getSubClustersForRack(rr.getResourceName());
          rackRequest = rr;
        } catch (YarnException e) {
          LOG.error("Cannot resolve rack : {}.", e.getMessage());
        }
        // Handle "ANY" requests
        if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
          anyRequest = rr;
          continue;
        }
      }

      if (nodeRequest == null) {
        throw new YarnException("Missing node request.");
      }
      if (rackRequest == null) {
        throw new YarnException("Missing rack request.");
      }
      if (anyRequest == null) {
        throw new YarnException("Missing any request.");
      }

      LOG.info("Node request: {} , Rack request: {} , Any request: {}.",
          nodeRequest.getResourceName(), rackRequest.getResourceName(),
          anyRequest.getResourceName());

      // Handle "node" requests
      if (validSubClusters.contains(targetId) && enabledSCs
          .contains(targetId)) {
        LOG.info("Node {} is in SubCluster: {}.", nodeRequest.getResourceName(), targetId);
        return targetId;
      } else {
        throw new YarnException("The node " + nodeRequest.getResourceName()
            + " is in a blacklist SubCluster or not active. ");
      }
    } catch (YarnException e) {
      LOG.error("Validating resource requests failed, " +
          "Falling back to WeightedRandomRouterPolicy placement : {}.", e.getMessage());
      // FailForward to WeightedRandomRouterPolicy
      // Overwrite request to use a default ANY
      ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
      amReq.setPriority(appSubmissionContext.getPriority());
      amReq.setResourceName(ResourceRequest.ANY);
      amReq.setCapability(appSubmissionContext.getResource());
      amReq.setNumContainers(1);
      amReq.setRelaxLocality(true);
      amReq.setNodeLabelExpression(appSubmissionContext.getNodeLabelExpression());
      amReq.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
      appSubmissionContext.setAMContainerResourceRequests(Collections.singletonList(amReq));
      return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractRouterPolicy 源码

hadoop FederationRouterPolicy 源码

hadoop HashBasedRouterPolicy 源码

hadoop LoadBasedRouterPolicy 源码

hadoop PriorityRouterPolicy 源码

hadoop RejectRouterPolicy 源码

hadoop UniformRandomRouterPolicy 源码

hadoop WeightedRandomRouterPolicy 源码

hadoop package-info 源码

0  赞