hadoop RouterPolicyFacade 源码

  • 2022-10-20
  • 浏览 (212)

haddop RouterPolicyFacade 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.federation.policies;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * This class provides a facade to the policy subsystem, and handles the
 * lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
 */
public class RouterPolicyFacade {

  private static final Logger LOG =
      LoggerFactory.getLogger(RouterPolicyFacade.class);

  private final SubClusterResolver subClusterResolver;
  private final FederationStateStoreFacade federationFacade;
  private Map<String, SubClusterPolicyConfiguration> globalConfMap;

  @VisibleForTesting
  Map<String, FederationRouterPolicy> globalPolicyMap;

  public RouterPolicyFacade(Configuration conf,
      FederationStateStoreFacade facade, SubClusterResolver resolver,
      SubClusterId homeSubcluster)
      throws FederationPolicyInitializationException {

    this.federationFacade = facade;
    this.subClusterResolver = resolver;
    this.globalConfMap = new ConcurrentHashMap<>();
    this.globalPolicyMap = new ConcurrentHashMap<>();

    // load default behavior from store if possible
    String defaultKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
    SubClusterPolicyConfiguration configuration = null;
    try {
      configuration = federationFacade.getPolicyConfiguration(defaultKey);
    } catch (YarnException e) {
      LOG.warn("No fallback behavior defined in store, defaulting to XML "
          + "configuration fallback behavior.");
    }

    // or from XML conf otherwise.
    if (configuration == null) {
      String defaultFederationPolicyManager =
          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
      String defaultPolicyParamString =
          conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
              YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
      ByteBuffer defaultPolicyParam = ByteBuffer
          .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));

      configuration = SubClusterPolicyConfiguration.newInstance(defaultKey,
          defaultFederationPolicyManager, defaultPolicyParam);
    }

    // construct the required policy manager
    FederationPolicyInitializationContext fallbackContext =
        new FederationPolicyInitializationContext(configuration,
            subClusterResolver, federationFacade, homeSubcluster);
    FederationPolicyManager fallbackPolicyManager =
        FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
    fallbackPolicyManager.setQueue(defaultKey);

    // add to the cache the fallback behavior
    globalConfMap.put(defaultKey,
        fallbackContext.getSubClusterPolicyConfiguration());
    globalPolicyMap.put(defaultKey,
        fallbackPolicyManager.getRouterPolicy(fallbackContext, null));

  }

  /**
   * This method provides a wrapper of all policy functionalities for routing .
   * Internally it manages configuration changes, and policy init/reinit.
   *
   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
   *          has to be routed to an appropriate subCluster for execution.
   *
   * @param blackListSubClusters the list of subClusters as identified by
   *          {@link SubClusterId} to blackList from the selection of the home
   *          subCluster.
   *
   * @return the {@link SubClusterId} that will be the "home" for this
   *         application.
   *
   * @throws YarnException if there are issues initializing policies, or no
   *           valid sub-cluster id could be found for this app.
   */
  public SubClusterId getHomeSubcluster(
      ApplicationSubmissionContext appSubmissionContext,
      List<SubClusterId> blackListSubClusters) throws YarnException {

    // the maps are concurrent, but we need to protect from reset()
    // reinitialization mid-execution by creating a new reference local to this
    // method.
    Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
    Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;

    if (appSubmissionContext == null) {
      throw new FederationPolicyException(
          "The ApplicationSubmissionContext cannot be null.");
    }

    String queue = appSubmissionContext.getQueue();

    // respecting YARN behavior we assume default queue if the queue is not
    // specified. This also ensures that "null" can be used as a key to get the
    // default behavior.
    if (queue == null) {
      queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
    }

    FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
    if (policy == null) {
      // this should never happen, as the to maps are updated together
      throw new FederationPolicyException("No FederationRouterPolicy found "
          + "for queue: " + appSubmissionContext.getQueue() + " (for "
          + "application: " + appSubmissionContext.getApplicationId() + ") "
          + "and no default specified.");
    }

    return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
  }

  /**
   * This method reinitializes a policy and loads it in the policyMap.
   *
   * @param queue the queue to initialize a policy for.
   * @param conf the configuration to use for initalization.
   *
   * @throws FederationPolicyInitializationException if initialization fails.
   */
  private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
      Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
      SubClusterPolicyConfiguration conf)
      throws FederationPolicyInitializationException {

    FederationPolicyInitializationContext context =
        new FederationPolicyInitializationContext(conf, subClusterResolver,
            federationFacade, null);
    String newType = context.getSubClusterPolicyConfiguration().getType();
    FederationRouterPolicy routerPolicy = policyMap.get(queue);

    FederationPolicyManager federationPolicyManager =
        FederationPolicyUtils.instantiatePolicyManager(newType);
    // set queue, reinit policy if required (implementation lazily check
    // content of conf), and cache it
    federationPolicyManager.setQueue(queue);
    routerPolicy =
        federationPolicyManager.getRouterPolicy(context, routerPolicy);

    // we need the two put to be atomic (across multiple threads invoking
    // this and reset operations)
    synchronized (this) {
      policyMap.put(queue, routerPolicy);
      cachedConfs.put(queue, conf);
    }
  }

  /**
   * This method flushes all cached configurations and policies. This should be
   * invoked if the facade remains activity after very large churn of queues in
   * the system.
   */
  public synchronized void reset() {

    // remember the fallBack
    SubClusterPolicyConfiguration conf =
        globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
    FederationRouterPolicy policy =
        globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);

    globalConfMap = new ConcurrentHashMap<>();
    globalPolicyMap = new ConcurrentHashMap<>();

    // add to the cache a fallback with keyword null
    globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
    globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
        policy);

  }

  /**
   * This method provides a wrapper of all policy functionalities for routing a
   * reservation. Internally it manages configuration changes, and policy
   * init/reinit.
   *
   * @param request the reservation to route.
   *
   * @return the id of the subcluster that will be the "home" for this
   *         reservation.
   *
   * @throws YarnException if there are issues initializing policies, or no
   *           valid sub-cluster id could be found for this reservation.
   */
  public SubClusterId getReservationHomeSubCluster(
      ReservationSubmissionRequest request) throws YarnException {

    // the maps are concurrent, but we need to protect from reset()
    // reinitialization mid-execution by creating a new reference local to this
    // method.
    Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
    Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;

    if (request == null) {
      throw new FederationPolicyException(
          "The ReservationSubmissionRequest cannot be null.");
    }

    String queue = request.getQueue();
    FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);

    if (policy == null) {
      // this should never happen, as the to maps are updated together
      throw new FederationPolicyException("No FederationRouterPolicy found "
          + "for queue: " + request.getQueue() + " (while routing "
          + "reservation: " + request.getReservationId() + ") "
          + "and no default specified.");
    }

    return policy.getReservationHomeSubcluster(request);
  }

  private FederationRouterPolicy getFederationRouterPolicy(
      Map<String, SubClusterPolicyConfiguration> cachedConfiguration,
      Map<String, FederationRouterPolicy> policyMap, String queue)
      throws FederationPolicyInitializationException {

    // the facade might cache this request, based on its parameterization
    SubClusterPolicyConfiguration configuration = null;
    String copyQueue = queue;

    try {
      configuration = federationFacade.getPolicyConfiguration(copyQueue);
    } catch (YarnException e) {
      LOG.warn("There is no policy configured for the queue: {}, falling back to defaults.",
          copyQueue, e);
    }

    // If there is no policy configured for this queue, fallback to the baseline
    // policy that is configured either in the store or via XML config (and cached)
    if (configuration == null) {
      final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
      LOG.warn("There is no policies configured for queue: {} " +
          "we fallback to default policy for: {}. ", copyQueue, policyKey);
      copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
      try {
        configuration = federationFacade.getPolicyConfiguration(copyQueue);
      } catch (YarnException e) {
        LOG.warn("Cannot retrieve policy configured for the queue: {}, falling back to defaults.",
            copyQueue, e);
      }
    }

    // the fallback is not configure via store, but via XML, using
    // previously loaded configuration.
    if (configuration == null) {
      configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
    }

    // if the configuration has changed since last loaded, reinit the policy
    // based on current configuration
    SubClusterPolicyConfiguration policyConfiguration =
        cachedConfiguration.getOrDefault(copyQueue, null);
    if (policyConfiguration == null || !policyConfiguration.equals(configuration)) {
      singlePolicyReinit(policyMap, cachedConfiguration, copyQueue, configuration);
    }

    return policyMap.get(copyQueue);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractConfigurableFederationPolicy 源码

hadoop ConfigurableFederationPolicy 源码

hadoop FederationPolicyInitializationContext 源码

hadoop FederationPolicyInitializationContextValidator 源码

hadoop FederationPolicyUtils 源码

hadoop package-info 源码

0  赞