hadoop ServiceApiUtil 源码

  • 2022-10-20
  • 浏览 (195)

haddop ServiceApiUtil 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.utils;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.thirdparty.com.google.common.collect.Multimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_DOES_NOT_NEED_UPGRADE;
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE;

public class ServiceApiUtil {
  private static final Logger LOG =
      LoggerFactory.getLogger(ServiceApiUtil.class);
  public static JsonSerDeser<Service> jsonSerDeser =
      new JsonSerDeser<>(Service.class,
          PropertyNamingStrategies.SNAKE_CASE);

  public static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
      new JsonSerDeser<>(Container[].class,
          PropertyNamingStrategies.SNAKE_CASE);

  public static final JsonSerDeser<ComponentContainers[]>
      COMP_CONTAINERS_JSON_SERDE = new JsonSerDeser<>(
          ComponentContainers[].class,
          PropertyNamingStrategies.SNAKE_CASE);

  public static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
      new JsonSerDeser<>(Component[].class,
          PropertyNamingStrategies.SNAKE_CASE);

  private static final PatternValidator namePattern
      = new PatternValidator("[a-z][a-z0-9-]*");

  private static final PatternValidator userNamePattern
      = new PatternValidator("[a-z][a-z0-9-.]*");

  @VisibleForTesting
  public static void setJsonSerDeser(JsonSerDeser jsd) {
    jsonSerDeser = jsd;
  }

  @VisibleForTesting
  public static void validateAndResolveService(Service service,
      SliderFileSystem fs, org.apache.hadoop.conf.Configuration conf) throws
      IOException {
    boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
        RegistryConstants.DEFAULT_DNS_ENABLED);
    if (dnsEnabled) {
      if (RegistryUtils.currentUser().length()
          > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
        throw new IllegalArgumentException(
            RestApiErrorMessages.ERROR_USER_NAME_INVALID);
      }
      userNamePattern.validate(RegistryUtils.currentUser());
    }

    if (StringUtils.isEmpty(service.getName())) {
      throw new IllegalArgumentException(
          RestApiErrorMessages.ERROR_APPLICATION_NAME_INVALID);
    }

    if (StringUtils.isEmpty(service.getVersion())) {
      throw new IllegalArgumentException(String.format(
          RestApiErrorMessages.ERROR_APPLICATION_VERSION_INVALID,
          service.getName()));
    }

    validateNameFormat(service.getName(), conf);

    // If the service has no components, throw error
    if (!hasComponent(service)) {
      throw new IllegalArgumentException(
          "No component specified for " + service.getName());
    }

    if (UserGroupInformation.isSecurityEnabled()) {
      validateKerberosPrincipal(service.getKerberosPrincipal());
    }

    // Validate the Docker client config.
    try {
      validateDockerClientConfiguration(service, conf);
    } catch (IOException e) {
      throw new IllegalArgumentException(e);
    }

    // Validate there are no component name collisions (collisions are not
    // currently supported) and add any components from external services
    Configuration globalConf = service.getConfiguration();
    Set<String> componentNames = new HashSet<>();
    List<Component> componentsToRemove = new ArrayList<>();
    List<Component> componentsToAdd = new ArrayList<>();
    for (Component comp : service.getComponents()) {
      int maxCompLength = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
      maxCompLength = maxCompLength - Long.toString(Long.MAX_VALUE).length();
      if (dnsEnabled && comp.getName().length() > maxCompLength) {
        throw new IllegalArgumentException(String.format(RestApiErrorMessages
            .ERROR_COMPONENT_NAME_INVALID, maxCompLength, comp.getName()));
      }
      if (service.getName().equals(comp.getName())) {
        throw new IllegalArgumentException(String.format(RestApiErrorMessages
                .ERROR_COMPONENT_NAME_CONFLICTS_WITH_SERVICE_NAME,
            comp.getName(), service.getName()));
      }
      if (componentNames.contains(comp.getName())) {
        throw new IllegalArgumentException("Component name collision: " +
            comp.getName());
      }
      // If artifact is of type SERVICE (which cannot be filled from global),
      // read external service and add its components to this service
      if (comp.getArtifact() != null && comp.getArtifact().getType() ==
          Artifact.TypeEnum.SERVICE) {
        if (StringUtils.isEmpty(comp.getArtifact().getId())) {
          throw new IllegalArgumentException(
              RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID);
        }
        LOG.info("Marking {} for removal", comp.getName());
        componentsToRemove.add(comp);
        List<Component> externalComponents = getComponents(fs,
            comp.getArtifact().getId());
        for (Component c : externalComponents) {
          Component override = service.getComponent(c.getName());
          if (override != null && override.getArtifact() == null) {
            // allow properties from external components to be overridden /
            // augmented by properties in this component, except for artifact
            // which must be read from external component
            override.mergeFrom(c);
            LOG.info("Merging external component {} from external {}", c
                .getName(), comp.getName());
          } else {
            if (componentNames.contains(c.getName())) {
              throw new IllegalArgumentException("Component name collision: " +
                  c.getName());
            }
            componentNames.add(c.getName());
            componentsToAdd.add(c);
            LOG.info("Adding component {} from external {}", c.getName(),
                comp.getName());
          }
        }
      } else {
        // otherwise handle as a normal component
        componentNames.add(comp.getName());
        // configuration
        comp.getConfiguration().mergeFrom(globalConf);
      }
    }
    service.getComponents().removeAll(componentsToRemove);
    service.getComponents().addAll(componentsToAdd);

    // Validate components and let global values take effect if component level
    // values are not provided
    Artifact globalArtifact = service.getArtifact();
    Resource globalResource = service.getResource();
    for (Component comp : service.getComponents()) {
      // fill in global artifact unless it is type SERVICE
      if (comp.getArtifact() == null && service.getArtifact() != null
          && service.getArtifact().getType() != Artifact.TypeEnum
          .SERVICE) {
        comp.setArtifact(globalArtifact);
      }
      // fill in global resource
      if (comp.getResource() == null) {
        comp.setResource(globalResource);
      }
      // validate dependency existence
      if (comp.getDependencies() != null) {
        for (String dependency : comp.getDependencies()) {
          if (!componentNames.contains(dependency)) {
            throw new IllegalArgumentException(String.format(
                RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency,
                comp.getName()));
          }
        }
      }
      validateComponent(comp, fs.getFileSystem(), conf);
    }
    validatePlacementPolicy(service.getComponents(), componentNames);

    // validate dependency tree
    sortByDependencies(service.getComponents());

    // Service lifetime if not specified, is set to unlimited lifetime
    if (service.getLifetime() == null) {
      service.setLifetime(RestApiConstants.DEFAULT_UNLIMITED_LIFETIME);
    }
  }

  public static void validateJvmOpts(String jvmOpts)
      throws IllegalArgumentException {
    Pattern pattern = Pattern.compile("[!~#?@*&%${}()<>\\[\\]|\",`;]");
    Matcher matcher = pattern.matcher(jvmOpts);
    if (matcher.find()) {
      throw new IllegalArgumentException(
          RestApiErrorMessages.ERROR_JVM_OPTS);
    }
  }

  public static void validateKerberosPrincipal(
      KerberosPrincipal kerberosPrincipal) throws IOException {
    if (!StringUtils.isEmpty(kerberosPrincipal.getPrincipalName())) {
      if (!kerberosPrincipal.getPrincipalName().contains("/")) {
        throw new IllegalArgumentException(String.format(
            RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
            kerberosPrincipal.getPrincipalName()));
      }
    }
  }

  private static void validateDockerClientConfiguration(Service service,
      org.apache.hadoop.conf.Configuration conf) throws IOException {
    String dockerClientConfig = service.getDockerClientConfig();
    if (!StringUtils.isEmpty(dockerClientConfig)) {
      Path dockerClientConfigPath = new Path(dockerClientConfig);
      FileSystem fs = dockerClientConfigPath.getFileSystem(conf);
      LOG.info("The supplied Docker client config is " + dockerClientConfig);
      if (!fs.exists(dockerClientConfigPath)) {
        throw new IOException(
            "The supplied Docker client config does not exist: "
                + dockerClientConfig);
      }
    }
  }

  private static void validateComponent(Component comp, FileSystem fs,
      org.apache.hadoop.conf.Configuration conf)
      throws IOException {
    validateNameFormat(comp.getName(), conf);

    AbstractClientProvider compClientProvider = ProviderFactory
        .getClientProvider(comp.getArtifact());
    compClientProvider.validateArtifact(comp.getArtifact(), comp.getName(), fs);

    if (comp.getLaunchCommand() == null && (comp.getArtifact() == null || comp
        .getArtifact().getType() != Artifact.TypeEnum.DOCKER)) {
      throw new IllegalArgumentException(RestApiErrorMessages
          .ERROR_ABSENT_LAUNCH_COMMAND);
    }

    validateServiceResource(comp.getResource(), comp);

    if (comp.getNumberOfContainers() == null
        || comp.getNumberOfContainers() < 0) {
      throw new IllegalArgumentException(String.format(
          RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID
              + ": " + comp.getNumberOfContainers(), comp.getName()));
    }
    compClientProvider.validateConfigFiles(comp.getConfiguration()
        .getFiles(), comp.getName(), fs);

    MonitorUtils.getProbe(comp.getReadinessCheck());
  }

  // Check component or service name format and transform to lower case.
  public static void validateNameFormat(String name,
      org.apache.hadoop.conf.Configuration conf) {
    if (StringUtils.isEmpty(name)) {
      throw new IllegalArgumentException("Name can not be empty!");
    }
    // validate component name
    if (name.contains("_")) {
      throw new IllegalArgumentException(
          "Invalid format: " + name
              + ", can not use '_', as DNS hostname does not allow '_'. Use '-' Instead. ");
    }
    boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
        RegistryConstants.DEFAULT_DNS_ENABLED);
    if (dnsEnabled && name.length() > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
      throw new IllegalArgumentException(String
          .format("Invalid format %s, must be no more than 63 characters ",
              name));
    }
    namePattern.validate(name);
  }

  private static void validatePlacementPolicy(List<Component> components,
      Set<String> componentNames) {
    for (Component comp : components) {
      PlacementPolicy placementPolicy = comp.getPlacementPolicy();
      if (placementPolicy != null) {
        for (PlacementConstraint constraint : placementPolicy
            .getConstraints()) {
          if (constraint.getType() == null) {
            throw new IllegalArgumentException(String.format(
              RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
              constraint.getName() == null ? "" : constraint.getName() + " ",
              comp.getName()));
          }
          if (constraint.getScope() == null) {
            throw new IllegalArgumentException(String.format(
              RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
              constraint.getName() == null ? "" : constraint.getName() + " ",
              comp.getName()));
          }
        }
      }
    }
  }

  @VisibleForTesting
  public static List<Component> getComponents(SliderFileSystem
      fs, String serviceName) throws IOException {
    return loadService(fs, serviceName).getComponents();
  }

  public static Service loadService(SliderFileSystem fs, String
      serviceName) throws IOException {
    Path serviceJson = getServiceJsonPath(fs, serviceName);
    LOG.info("Loading service definition from " + serviceJson);
    return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
  }

  public static Service loadServiceUpgrade(SliderFileSystem fs,
      String serviceName, String version) throws IOException {
    Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
    Path versionedDef = new Path(versionPath, serviceName + ".json");
    LOG.info("Loading service definition from {}", versionedDef);
    return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
  }

  public static Service loadServiceFrom(SliderFileSystem fs,
      Path appDefPath) throws IOException {
    LOG.info("Loading service definition from " + appDefPath);
    return jsonSerDeser.load(fs.getFileSystem(), appDefPath);
  }

  public static Path getServiceJsonPath(SliderFileSystem fs, String serviceName) {
    Path serviceDir = fs.buildClusterDirPath(serviceName);
    return new Path(serviceDir, serviceName + ".json");
  }

  private static void validateServiceResource(Resource resource,
      Component comp) {
    // Only services/components of type SERVICE can skip resource requirement
    if (resource == null) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_INVALID : String
              .format(RestApiErrorMessages.ERROR_RESOURCE_FOR_COMP_INVALID,
                  comp.getName()));
    }
    // One and only one of profile OR cpus & memory can be specified. Specifying
    // both raises validation error.
    if (StringUtils.isNotEmpty(resource.getProfile()) && (
        resource.getCpus() != null || StringUtils
            .isNotEmpty(resource.getMemory()))) {
      throw new IllegalArgumentException(comp == null ?
          RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED :
          String.format(
              RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
              comp.getName()));
    }
    // Currently resource profile is not supported yet, so we will raise
    // validation error if only resource profile is specified
    if (StringUtils.isNotEmpty(resource.getProfile())) {
      throw new IllegalArgumentException(
          RestApiErrorMessages.ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET);
    }

    String memory = resource.getMemory();
    Integer cpus = resource.getCpus();
    if (StringUtils.isEmpty(memory)) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_MEMORY_INVALID :
              String.format(
                  RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
                  comp.getName()));
    }
    if (cpus == null) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID :
              String.format(
                  RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID,
                  comp.getName()));
    }
    if (cpus <= 0) {
      throw new IllegalArgumentException(comp == null ?
          RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID_RANGE : String
          .format(
              RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
              comp.getName()));
    }
  }

  // check if comp mem size exceeds cluster limit
  public static void validateCompResourceSize(
      org.apache.hadoop.yarn.api.records.Resource maxResource,
      Service service) throws YarnException {
    for (Component component : service.getComponents()) {
      long mem = Long.parseLong(component.getResource().getMemory());
      if (mem > maxResource.getMemorySize()) {
        throw new YarnException(
            "Component " + component.getName() + ": specified memory size ("
                + mem + ") is larger than configured max container memory " +
                "size (" + maxResource.getMemorySize() + ")");
      }
      int cpu = component.getResource().getCpus();
      if (cpu > maxResource.getVirtualCores()) {
        throw new YarnException(
            "Component " + component.getName() + ": specified number of " +
                "virtual core (" + cpu + ") is larger than configured max " +
                "virtual core size (" + maxResource.getVirtualCores() + ")");
      }
    }
  }

  private static boolean hasComponent(Service service) {
    if (service.getComponents() == null || service.getComponents()
        .isEmpty()) {
      return false;
    }
    return true;
  }

  public static Collection<Component> sortByDependencies(List<Component>
      components) {
    Map<String, Component> sortedComponents =
        sortByDependencies(components, null);
    return sortedComponents.values();
  }

  /**
   * Each internal call of sortByDependencies will identify all of the
   * components with the same dependency depth (the lowest depth that has not
   * been processed yet) and add them to the sortedComponents list, preserving
   * their original ordering in the components list.
   *
   * So the first time it is called, all components with no dependencies
   * (depth 0) will be identified. The next time it is called, all components
   * that have dependencies only on the the depth 0 components will be
   * identified (depth 1). This will be repeated until all components have
   * been added to the sortedComponents list. If no new components are
   * identified but the sortedComponents list is not complete, an error is
   * thrown.
   */
  private static Map<String, Component> sortByDependencies(List<Component>
      components, Map<String, Component> sortedComponents) {
    if (sortedComponents == null) {
      sortedComponents = new LinkedHashMap<>();
    }

    Map<String, Component> componentsToAdd = new LinkedHashMap<>();
    List<Component> componentsSkipped = new ArrayList<>();
    for (Component component : components) {
      String name = component.getName();
      if (sortedComponents.containsKey(name)) {
        continue;
      }
      boolean dependenciesAlreadySorted = true;
      if (!ServiceUtils.isEmpty(component.getDependencies())) {
        for (String dependency : component.getDependencies()) {
          if (!sortedComponents.containsKey(dependency)) {
            dependenciesAlreadySorted = false;
            break;
          }
        }
      }
      if (dependenciesAlreadySorted) {
        componentsToAdd.put(name, component);
      } else {
        componentsSkipped.add(component);
      }
    }

    if (componentsToAdd.size() == 0) {
      throw new IllegalArgumentException(String.format(RestApiErrorMessages
          .ERROR_DEPENDENCY_CYCLE, componentsSkipped));
    }
    sortedComponents.putAll(componentsToAdd);
    if (sortedComponents.size() == components.size()) {
      return sortedComponents;
    }
    return sortByDependencies(components, sortedComponents);
  }

  public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
      Service service)
      throws IOException, SliderException {
    FsPermission appDirPermission = new FsPermission("750");
    fs.createWithPermissions(appDir, appDirPermission);
    Path appJson = writeAppDefinition(fs, appDir, service);
    LOG.info("Persisted service {} version {} at {}", service.getName(),
        service.getVersion(), appJson);
  }

  public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
      Service service) throws IOException {
    Path appJson = new Path(appDir, service.getName() + ".json");
    jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
    return appJson;
  }

  public static Path writeAppDefinition(SliderFileSystem fs, Service service)
      throws IOException {
    Path appJson = getServiceJsonPath(fs, service.getName());
    jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
    return appJson;
  }

  public static List<Container> getLiveContainers(Service service,
      List<String> componentInstances)
      throws YarnException {
    List<Container> result = new ArrayList<>();

    // In order to avoid iterating over all the containers of all components,
    // first find the affected components by parsing the instance name.
    Multimap<String, String> affectedComps = ArrayListMultimap.create();
    for (String instanceName : componentInstances) {
      affectedComps.put(
          ServiceApiUtil.parseComponentName(instanceName), instanceName);
    }

    service.getComponents().forEach(comp -> {
      // Iterating once over the containers of the affected component to
      // find all the containers. Avoiding multiple calls to
      // service.getComponent(...) and component.getContainer(...) because they
      // iterate over all the components of the service and all the containers
      // of the components respectively.
      if (affectedComps.get(comp.getName()) != null) {
        Collection<String> instanceNames = affectedComps.get(comp.getName());
        comp.getContainers().forEach(container -> {
          if (instanceNames.contains(container.getComponentInstanceName())) {
            result.add(container);
          }
        });
      }
    });
    return result;
  }

  /**
   * Validates that the component instances that are requested to upgrade
   * require an upgrade.
   */
  public static void validateInstancesUpgrade(List<Container>
      liveContainers) throws YarnException {
    for (Container liveContainer : liveContainers) {
      if (!isUpgradable(liveContainer)) {
        // Nothing to upgrade
        throw new YarnException(String.format(
            ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
            liveContainer.getComponentInstanceName()));
      }
    }
  }

  /**
   * Returns whether the container can be upgraded in the current state.
   */
  public static boolean isUpgradable(Container container) {

    return container.getState() != null &&
        (container.getState().equals(ContainerState.NEEDS_UPGRADE) ||
            container.getState().equals(ContainerState.FAILED_UPGRADE));
  }

  /**
   * Validates the components that are requested to upgrade require an upgrade.
   * It returns the instances of the components which need upgrade.
   */
  public static List<Container> validateAndResolveCompsUpgrade(
      Service liveService, Collection<String> compNames) throws YarnException {
    Preconditions.checkNotNull(compNames);
    HashSet<String> requestedComps = Sets.newHashSet(compNames);
    List<Container> containerNeedUpgrade = new ArrayList<>();
    for (Component liveComp : liveService.getComponents()) {
      if (requestedComps.contains(liveComp.getName())) {
        if (!liveComp.getState().equals(ComponentState.NEEDS_UPGRADE)) {
          // Nothing to upgrade
          throw new YarnException(String.format(
              ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
        }
        liveComp.getContainers().forEach(liveContainer -> {
          if (isUpgradable(liveContainer)) {
            containerNeedUpgrade.add(liveContainer);
          }
        });
      }
    }
    return containerNeedUpgrade;
  }

  /**
   * Validates the components that are requested are stable for upgrade.
   * It returns the instances of the components which are in ready state.
   */
  public static List<Container> validateAndResolveCompsStable(
      Service liveService, Collection<String> compNames) throws YarnException {
    Preconditions.checkNotNull(compNames);
    HashSet<String> requestedComps = Sets.newHashSet(compNames);
    List<Container> containerNeedUpgrade = new ArrayList<>();
    for (Component liveComp : liveService.getComponents()) {
      if (requestedComps.contains(liveComp.getName())) {
        if (!liveComp.getState().equals(ComponentState.STABLE)) {
          // Nothing to upgrade
          throw new YarnException(String.format(
              ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
        }
        liveComp.getContainers().forEach(liveContainer -> {
          if (liveContainer.getState().equals(ContainerState.READY)) {
            containerNeedUpgrade.add(liveContainer);
          }
        });
      }
    }
    return containerNeedUpgrade;
  }

  public static String getHostnameSuffix(String serviceName, org.apache
      .hadoop.conf.Configuration conf) {
    String domain = conf.get(RegistryConstants.KEY_DNS_DOMAIN);
    String hostnameSuffix;
    if (domain == null || domain.isEmpty()) {
      hostnameSuffix = MessageFormat
          .format(".{0}.{1}", serviceName, RegistryUtils.currentUser());
    } else {
      hostnameSuffix = MessageFormat
          .format(".{0}.{1}.{2}", serviceName,
              RegistryUtils.currentUser(), domain);
    }
    return hostnameSuffix;
  }

  public static String parseAndValidateComponentInstanceName(String
      instanceOrHostname, String serviceName, org.apache.hadoop.conf
      .Configuration conf) throws IllegalArgumentException {
    int idx = instanceOrHostname.indexOf('.');
    String hostnameSuffix = getHostnameSuffix(serviceName, conf);
    if (idx != -1) {
      if (!instanceOrHostname.endsWith(hostnameSuffix)) {
        throw new IllegalArgumentException("Specified hostname " +
            instanceOrHostname + " does not have the expected format " +
            "componentInstanceName" +
            hostnameSuffix);
      }
      instanceOrHostname = instanceOrHostname.substring(0, instanceOrHostname
          .length() - hostnameSuffix.length());
    }
    idx = instanceOrHostname.indexOf('.');
    if (idx != -1) {
      throw new IllegalArgumentException("Specified hostname " +
          instanceOrHostname + " does not have the expected format " +
          "componentInstanceName" +
          hostnameSuffix);
    }
    return instanceOrHostname;
  }

  public static String parseComponentName(String componentInstanceName)
      throws YarnException {
    int idx = componentInstanceName.indexOf('.');
    if (idx != -1) {
      componentInstanceName = componentInstanceName.substring(0, idx);
    }
    idx = componentInstanceName.lastIndexOf('-');
    if (idx == -1) {
      throw new YarnException("Invalid component instance (" +
          componentInstanceName + ") name.");
    }
    return componentInstanceName.substring(0, idx);
  }

  public static String $(String s) {
    return "${" + s +"}";
  }

  public static List<String> resolveCompsDependency(Service service) {
    List<String> components = new ArrayList<String>();
    for (Component component : service.getComponents()) {
      int depSize = component.getDependencies().size();
      if (!components.contains(component.getName())) {
        components.add(component.getName());
      }
      if (depSize != 0) {
        for (String depComp : component.getDependencies()) {
          if (!components.contains(depComp)) {
            components.add(0, depComp);
          }
        }
      }
    }
    return components;
  }

  private static boolean serviceDependencySatisfied(Service service) {
    boolean result = true;
    try {
      List<String> dependencies = service
          .getDependencies();
      org.apache.hadoop.conf.Configuration conf =
          new org.apache.hadoop.conf.Configuration();
      if (dependencies != null && dependencies.size() > 0) {
        ServiceClient sc = new ServiceClient();
        sc.init(conf);
        sc.start();
        for (String dependent : dependencies) {
          Service dependentService = sc.getStatus(dependent);
          if (dependentService.getState() == null ||
              !dependentService.getState().equals(ServiceState.STABLE)) {
            result = false;
            LOG.info("Service dependency is not satisfied for " +
                "service: {} state: {}", dependent,
                dependentService.getState());
          }
        }
        sc.close();
      }
    } catch (IOException | YarnException e) {
      LOG.warn("Caught exception: ", e);
      LOG.info("Service dependency is not satisified.");
      result = false;
    }
    return result;
  }

  public static void checkServiceDependencySatisified(Service service) {
    while (!serviceDependencySatisfied(service)) {
      try {
        LOG.info("Waiting for service dependencies.");
        Thread.sleep(15000L);
      } catch (InterruptedException e) {
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ApplicationReportSerDeser 源码

hadoop ClientRegistryBinder 源码

hadoop Comparators 源码

hadoop ConfigHelper 源码

hadoop ConfigUtils 源码

hadoop CoreFileSystem 源码

hadoop Duration 源码

hadoop FilterUtils 源码

hadoop HttpUtil 源码

hadoop JsonSerDeser 源码

0  赞