hadoop ServiceApiUtil 源码
haddop ServiceApiUtil 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.thirdparty.com.google.common.collect.Multimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_DOES_NOT_NEED_UPGRADE;
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE;
public class ServiceApiUtil {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceApiUtil.class);
public static JsonSerDeser<Service> jsonSerDeser =
new JsonSerDeser<>(Service.class,
PropertyNamingStrategies.SNAKE_CASE);
public static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategies.SNAKE_CASE);
public static final JsonSerDeser<ComponentContainers[]>
COMP_CONTAINERS_JSON_SERDE = new JsonSerDeser<>(
ComponentContainers[].class,
PropertyNamingStrategies.SNAKE_CASE);
public static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
new JsonSerDeser<>(Component[].class,
PropertyNamingStrategies.SNAKE_CASE);
private static final PatternValidator namePattern
= new PatternValidator("[a-z][a-z0-9-]*");
private static final PatternValidator userNamePattern
= new PatternValidator("[a-z][a-z0-9-.]*");
@VisibleForTesting
public static void setJsonSerDeser(JsonSerDeser jsd) {
jsonSerDeser = jsd;
}
@VisibleForTesting
public static void validateAndResolveService(Service service,
SliderFileSystem fs, org.apache.hadoop.conf.Configuration conf) throws
IOException {
boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
RegistryConstants.DEFAULT_DNS_ENABLED);
if (dnsEnabled) {
if (RegistryUtils.currentUser().length()
> RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_USER_NAME_INVALID);
}
userNamePattern.validate(RegistryUtils.currentUser());
}
if (StringUtils.isEmpty(service.getName())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_APPLICATION_NAME_INVALID);
}
if (StringUtils.isEmpty(service.getVersion())) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_APPLICATION_VERSION_INVALID,
service.getName()));
}
validateNameFormat(service.getName(), conf);
// If the service has no components, throw error
if (!hasComponent(service)) {
throw new IllegalArgumentException(
"No component specified for " + service.getName());
}
if (UserGroupInformation.isSecurityEnabled()) {
validateKerberosPrincipal(service.getKerberosPrincipal());
}
// Validate the Docker client config.
try {
validateDockerClientConfiguration(service, conf);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
// Validate there are no component name collisions (collisions are not
// currently supported) and add any components from external services
Configuration globalConf = service.getConfiguration();
Set<String> componentNames = new HashSet<>();
List<Component> componentsToRemove = new ArrayList<>();
List<Component> componentsToAdd = new ArrayList<>();
for (Component comp : service.getComponents()) {
int maxCompLength = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
maxCompLength = maxCompLength - Long.toString(Long.MAX_VALUE).length();
if (dnsEnabled && comp.getName().length() > maxCompLength) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_COMPONENT_NAME_INVALID, maxCompLength, comp.getName()));
}
if (service.getName().equals(comp.getName())) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_COMPONENT_NAME_CONFLICTS_WITH_SERVICE_NAME,
comp.getName(), service.getName()));
}
if (componentNames.contains(comp.getName())) {
throw new IllegalArgumentException("Component name collision: " +
comp.getName());
}
// If artifact is of type SERVICE (which cannot be filled from global),
// read external service and add its components to this service
if (comp.getArtifact() != null && comp.getArtifact().getType() ==
Artifact.TypeEnum.SERVICE) {
if (StringUtils.isEmpty(comp.getArtifact().getId())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID);
}
LOG.info("Marking {} for removal", comp.getName());
componentsToRemove.add(comp);
List<Component> externalComponents = getComponents(fs,
comp.getArtifact().getId());
for (Component c : externalComponents) {
Component override = service.getComponent(c.getName());
if (override != null && override.getArtifact() == null) {
// allow properties from external components to be overridden /
// augmented by properties in this component, except for artifact
// which must be read from external component
override.mergeFrom(c);
LOG.info("Merging external component {} from external {}", c
.getName(), comp.getName());
} else {
if (componentNames.contains(c.getName())) {
throw new IllegalArgumentException("Component name collision: " +
c.getName());
}
componentNames.add(c.getName());
componentsToAdd.add(c);
LOG.info("Adding component {} from external {}", c.getName(),
comp.getName());
}
}
} else {
// otherwise handle as a normal component
componentNames.add(comp.getName());
// configuration
comp.getConfiguration().mergeFrom(globalConf);
}
}
service.getComponents().removeAll(componentsToRemove);
service.getComponents().addAll(componentsToAdd);
// Validate components and let global values take effect if component level
// values are not provided
Artifact globalArtifact = service.getArtifact();
Resource globalResource = service.getResource();
for (Component comp : service.getComponents()) {
// fill in global artifact unless it is type SERVICE
if (comp.getArtifact() == null && service.getArtifact() != null
&& service.getArtifact().getType() != Artifact.TypeEnum
.SERVICE) {
comp.setArtifact(globalArtifact);
}
// fill in global resource
if (comp.getResource() == null) {
comp.setResource(globalResource);
}
// validate dependency existence
if (comp.getDependencies() != null) {
for (String dependency : comp.getDependencies()) {
if (!componentNames.contains(dependency)) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency,
comp.getName()));
}
}
}
validateComponent(comp, fs.getFileSystem(), conf);
}
validatePlacementPolicy(service.getComponents(), componentNames);
// validate dependency tree
sortByDependencies(service.getComponents());
// Service lifetime if not specified, is set to unlimited lifetime
if (service.getLifetime() == null) {
service.setLifetime(RestApiConstants.DEFAULT_UNLIMITED_LIFETIME);
}
}
public static void validateJvmOpts(String jvmOpts)
throws IllegalArgumentException {
Pattern pattern = Pattern.compile("[!~#?@*&%${}()<>\\[\\]|\",`;]");
Matcher matcher = pattern.matcher(jvmOpts);
if (matcher.find()) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_JVM_OPTS);
}
}
public static void validateKerberosPrincipal(
KerberosPrincipal kerberosPrincipal) throws IOException {
if (!StringUtils.isEmpty(kerberosPrincipal.getPrincipalName())) {
if (!kerberosPrincipal.getPrincipalName().contains("/")) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
kerberosPrincipal.getPrincipalName()));
}
}
}
private static void validateDockerClientConfiguration(Service service,
org.apache.hadoop.conf.Configuration conf) throws IOException {
String dockerClientConfig = service.getDockerClientConfig();
if (!StringUtils.isEmpty(dockerClientConfig)) {
Path dockerClientConfigPath = new Path(dockerClientConfig);
FileSystem fs = dockerClientConfigPath.getFileSystem(conf);
LOG.info("The supplied Docker client config is " + dockerClientConfig);
if (!fs.exists(dockerClientConfigPath)) {
throw new IOException(
"The supplied Docker client config does not exist: "
+ dockerClientConfig);
}
}
}
private static void validateComponent(Component comp, FileSystem fs,
org.apache.hadoop.conf.Configuration conf)
throws IOException {
validateNameFormat(comp.getName(), conf);
AbstractClientProvider compClientProvider = ProviderFactory
.getClientProvider(comp.getArtifact());
compClientProvider.validateArtifact(comp.getArtifact(), comp.getName(), fs);
if (comp.getLaunchCommand() == null && (comp.getArtifact() == null || comp
.getArtifact().getType() != Artifact.TypeEnum.DOCKER)) {
throw new IllegalArgumentException(RestApiErrorMessages
.ERROR_ABSENT_LAUNCH_COMMAND);
}
validateServiceResource(comp.getResource(), comp);
if (comp.getNumberOfContainers() == null
|| comp.getNumberOfContainers() < 0) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID
+ ": " + comp.getNumberOfContainers(), comp.getName()));
}
compClientProvider.validateConfigFiles(comp.getConfiguration()
.getFiles(), comp.getName(), fs);
MonitorUtils.getProbe(comp.getReadinessCheck());
}
// Check component or service name format and transform to lower case.
public static void validateNameFormat(String name,
org.apache.hadoop.conf.Configuration conf) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Name can not be empty!");
}
// validate component name
if (name.contains("_")) {
throw new IllegalArgumentException(
"Invalid format: " + name
+ ", can not use '_', as DNS hostname does not allow '_'. Use '-' Instead. ");
}
boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
RegistryConstants.DEFAULT_DNS_ENABLED);
if (dnsEnabled && name.length() > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
throw new IllegalArgumentException(String
.format("Invalid format %s, must be no more than 63 characters ",
name));
}
namePattern.validate(name);
}
private static void validatePlacementPolicy(List<Component> components,
Set<String> componentNames) {
for (Component comp : components) {
PlacementPolicy placementPolicy = comp.getPlacementPolicy();
if (placementPolicy != null) {
for (PlacementConstraint constraint : placementPolicy
.getConstraints()) {
if (constraint.getType() == null) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
constraint.getName() == null ? "" : constraint.getName() + " ",
comp.getName()));
}
if (constraint.getScope() == null) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
constraint.getName() == null ? "" : constraint.getName() + " ",
comp.getName()));
}
}
}
}
}
@VisibleForTesting
public static List<Component> getComponents(SliderFileSystem
fs, String serviceName) throws IOException {
return loadService(fs, serviceName).getComponents();
}
public static Service loadService(SliderFileSystem fs, String
serviceName) throws IOException {
Path serviceJson = getServiceJsonPath(fs, serviceName);
LOG.info("Loading service definition from " + serviceJson);
return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
}
public static Service loadServiceUpgrade(SliderFileSystem fs,
String serviceName, String version) throws IOException {
Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
Path versionedDef = new Path(versionPath, serviceName + ".json");
LOG.info("Loading service definition from {}", versionedDef);
return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
}
public static Service loadServiceFrom(SliderFileSystem fs,
Path appDefPath) throws IOException {
LOG.info("Loading service definition from " + appDefPath);
return jsonSerDeser.load(fs.getFileSystem(), appDefPath);
}
public static Path getServiceJsonPath(SliderFileSystem fs, String serviceName) {
Path serviceDir = fs.buildClusterDirPath(serviceName);
return new Path(serviceDir, serviceName + ".json");
}
private static void validateServiceResource(Resource resource,
Component comp) {
// Only services/components of type SERVICE can skip resource requirement
if (resource == null) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_INVALID : String
.format(RestApiErrorMessages.ERROR_RESOURCE_FOR_COMP_INVALID,
comp.getName()));
}
// One and only one of profile OR cpus & memory can be specified. Specifying
// both raises validation error.
if (StringUtils.isNotEmpty(resource.getProfile()) && (
resource.getCpus() != null || StringUtils
.isNotEmpty(resource.getMemory()))) {
throw new IllegalArgumentException(comp == null ?
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
comp.getName()));
}
// Currently resource profile is not supported yet, so we will raise
// validation error if only resource profile is specified
if (StringUtils.isNotEmpty(resource.getProfile())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET);
}
String memory = resource.getMemory();
Integer cpus = resource.getCpus();
if (StringUtils.isEmpty(memory)) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_MEMORY_INVALID :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
comp.getName()));
}
if (cpus == null) {
throw new IllegalArgumentException(
comp == null ? RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID :
String.format(
RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID,
comp.getName()));
}
if (cpus <= 0) {
throw new IllegalArgumentException(comp == null ?
RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID_RANGE : String
.format(
RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
comp.getName()));
}
}
// check if comp mem size exceeds cluster limit
public static void validateCompResourceSize(
org.apache.hadoop.yarn.api.records.Resource maxResource,
Service service) throws YarnException {
for (Component component : service.getComponents()) {
long mem = Long.parseLong(component.getResource().getMemory());
if (mem > maxResource.getMemorySize()) {
throw new YarnException(
"Component " + component.getName() + ": specified memory size ("
+ mem + ") is larger than configured max container memory " +
"size (" + maxResource.getMemorySize() + ")");
}
int cpu = component.getResource().getCpus();
if (cpu > maxResource.getVirtualCores()) {
throw new YarnException(
"Component " + component.getName() + ": specified number of " +
"virtual core (" + cpu + ") is larger than configured max " +
"virtual core size (" + maxResource.getVirtualCores() + ")");
}
}
}
private static boolean hasComponent(Service service) {
if (service.getComponents() == null || service.getComponents()
.isEmpty()) {
return false;
}
return true;
}
public static Collection<Component> sortByDependencies(List<Component>
components) {
Map<String, Component> sortedComponents =
sortByDependencies(components, null);
return sortedComponents.values();
}
/**
* Each internal call of sortByDependencies will identify all of the
* components with the same dependency depth (the lowest depth that has not
* been processed yet) and add them to the sortedComponents list, preserving
* their original ordering in the components list.
*
* So the first time it is called, all components with no dependencies
* (depth 0) will be identified. The next time it is called, all components
* that have dependencies only on the the depth 0 components will be
* identified (depth 1). This will be repeated until all components have
* been added to the sortedComponents list. If no new components are
* identified but the sortedComponents list is not complete, an error is
* thrown.
*/
private static Map<String, Component> sortByDependencies(List<Component>
components, Map<String, Component> sortedComponents) {
if (sortedComponents == null) {
sortedComponents = new LinkedHashMap<>();
}
Map<String, Component> componentsToAdd = new LinkedHashMap<>();
List<Component> componentsSkipped = new ArrayList<>();
for (Component component : components) {
String name = component.getName();
if (sortedComponents.containsKey(name)) {
continue;
}
boolean dependenciesAlreadySorted = true;
if (!ServiceUtils.isEmpty(component.getDependencies())) {
for (String dependency : component.getDependencies()) {
if (!sortedComponents.containsKey(dependency)) {
dependenciesAlreadySorted = false;
break;
}
}
}
if (dependenciesAlreadySorted) {
componentsToAdd.put(name, component);
} else {
componentsSkipped.add(component);
}
}
if (componentsToAdd.size() == 0) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_DEPENDENCY_CYCLE, componentsSkipped));
}
sortedComponents.putAll(componentsToAdd);
if (sortedComponents.size() == components.size()) {
return sortedComponents;
}
return sortByDependencies(components, sortedComponents);
}
public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
Service service)
throws IOException, SliderException {
FsPermission appDirPermission = new FsPermission("750");
fs.createWithPermissions(appDir, appDirPermission);
Path appJson = writeAppDefinition(fs, appDir, service);
LOG.info("Persisted service {} version {} at {}", service.getName(),
service.getVersion(), appJson);
}
public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
Service service) throws IOException {
Path appJson = new Path(appDir, service.getName() + ".json");
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
public static Path writeAppDefinition(SliderFileSystem fs, Service service)
throws IOException {
Path appJson = getServiceJsonPath(fs, service.getName());
jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
return appJson;
}
public static List<Container> getLiveContainers(Service service,
List<String> componentInstances)
throws YarnException {
List<Container> result = new ArrayList<>();
// In order to avoid iterating over all the containers of all components,
// first find the affected components by parsing the instance name.
Multimap<String, String> affectedComps = ArrayListMultimap.create();
for (String instanceName : componentInstances) {
affectedComps.put(
ServiceApiUtil.parseComponentName(instanceName), instanceName);
}
service.getComponents().forEach(comp -> {
// Iterating once over the containers of the affected component to
// find all the containers. Avoiding multiple calls to
// service.getComponent(...) and component.getContainer(...) because they
// iterate over all the components of the service and all the containers
// of the components respectively.
if (affectedComps.get(comp.getName()) != null) {
Collection<String> instanceNames = affectedComps.get(comp.getName());
comp.getContainers().forEach(container -> {
if (instanceNames.contains(container.getComponentInstanceName())) {
result.add(container);
}
});
}
});
return result;
}
/**
* Validates that the component instances that are requested to upgrade
* require an upgrade.
*/
public static void validateInstancesUpgrade(List<Container>
liveContainers) throws YarnException {
for (Container liveContainer : liveContainers) {
if (!isUpgradable(liveContainer)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
liveContainer.getComponentInstanceName()));
}
}
}
/**
* Returns whether the container can be upgraded in the current state.
*/
public static boolean isUpgradable(Container container) {
return container.getState() != null &&
(container.getState().equals(ContainerState.NEEDS_UPGRADE) ||
container.getState().equals(ContainerState.FAILED_UPGRADE));
}
/**
* Validates the components that are requested to upgrade require an upgrade.
* It returns the instances of the components which need upgrade.
*/
public static List<Container> validateAndResolveCompsUpgrade(
Service liveService, Collection<String> compNames) throws YarnException {
Preconditions.checkNotNull(compNames);
HashSet<String> requestedComps = Sets.newHashSet(compNames);
List<Container> containerNeedUpgrade = new ArrayList<>();
for (Component liveComp : liveService.getComponents()) {
if (requestedComps.contains(liveComp.getName())) {
if (!liveComp.getState().equals(ComponentState.NEEDS_UPGRADE)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
}
liveComp.getContainers().forEach(liveContainer -> {
if (isUpgradable(liveContainer)) {
containerNeedUpgrade.add(liveContainer);
}
});
}
}
return containerNeedUpgrade;
}
/**
* Validates the components that are requested are stable for upgrade.
* It returns the instances of the components which are in ready state.
*/
public static List<Container> validateAndResolveCompsStable(
Service liveService, Collection<String> compNames) throws YarnException {
Preconditions.checkNotNull(compNames);
HashSet<String> requestedComps = Sets.newHashSet(compNames);
List<Container> containerNeedUpgrade = new ArrayList<>();
for (Component liveComp : liveService.getComponents()) {
if (requestedComps.contains(liveComp.getName())) {
if (!liveComp.getState().equals(ComponentState.STABLE)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
}
liveComp.getContainers().forEach(liveContainer -> {
if (liveContainer.getState().equals(ContainerState.READY)) {
containerNeedUpgrade.add(liveContainer);
}
});
}
}
return containerNeedUpgrade;
}
public static String getHostnameSuffix(String serviceName, org.apache
.hadoop.conf.Configuration conf) {
String domain = conf.get(RegistryConstants.KEY_DNS_DOMAIN);
String hostnameSuffix;
if (domain == null || domain.isEmpty()) {
hostnameSuffix = MessageFormat
.format(".{0}.{1}", serviceName, RegistryUtils.currentUser());
} else {
hostnameSuffix = MessageFormat
.format(".{0}.{1}.{2}", serviceName,
RegistryUtils.currentUser(), domain);
}
return hostnameSuffix;
}
public static String parseAndValidateComponentInstanceName(String
instanceOrHostname, String serviceName, org.apache.hadoop.conf
.Configuration conf) throws IllegalArgumentException {
int idx = instanceOrHostname.indexOf('.');
String hostnameSuffix = getHostnameSuffix(serviceName, conf);
if (idx != -1) {
if (!instanceOrHostname.endsWith(hostnameSuffix)) {
throw new IllegalArgumentException("Specified hostname " +
instanceOrHostname + " does not have the expected format " +
"componentInstanceName" +
hostnameSuffix);
}
instanceOrHostname = instanceOrHostname.substring(0, instanceOrHostname
.length() - hostnameSuffix.length());
}
idx = instanceOrHostname.indexOf('.');
if (idx != -1) {
throw new IllegalArgumentException("Specified hostname " +
instanceOrHostname + " does not have the expected format " +
"componentInstanceName" +
hostnameSuffix);
}
return instanceOrHostname;
}
public static String parseComponentName(String componentInstanceName)
throws YarnException {
int idx = componentInstanceName.indexOf('.');
if (idx != -1) {
componentInstanceName = componentInstanceName.substring(0, idx);
}
idx = componentInstanceName.lastIndexOf('-');
if (idx == -1) {
throw new YarnException("Invalid component instance (" +
componentInstanceName + ") name.");
}
return componentInstanceName.substring(0, idx);
}
public static String $(String s) {
return "${" + s +"}";
}
public static List<String> resolveCompsDependency(Service service) {
List<String> components = new ArrayList<String>();
for (Component component : service.getComponents()) {
int depSize = component.getDependencies().size();
if (!components.contains(component.getName())) {
components.add(component.getName());
}
if (depSize != 0) {
for (String depComp : component.getDependencies()) {
if (!components.contains(depComp)) {
components.add(0, depComp);
}
}
}
}
return components;
}
private static boolean serviceDependencySatisfied(Service service) {
boolean result = true;
try {
List<String> dependencies = service
.getDependencies();
org.apache.hadoop.conf.Configuration conf =
new org.apache.hadoop.conf.Configuration();
if (dependencies != null && dependencies.size() > 0) {
ServiceClient sc = new ServiceClient();
sc.init(conf);
sc.start();
for (String dependent : dependencies) {
Service dependentService = sc.getStatus(dependent);
if (dependentService.getState() == null ||
!dependentService.getState().equals(ServiceState.STABLE)) {
result = false;
LOG.info("Service dependency is not satisfied for " +
"service: {} state: {}", dependent,
dependentService.getState());
}
}
sc.close();
}
} catch (IOException | YarnException e) {
LOG.warn("Caught exception: ", e);
LOG.info("Service dependency is not satisified.");
result = false;
}
return result;
}
public static void checkServiceDependencySatisified(Service service) {
while (!serviceDependencySatisfied(service)) {
try {
LOG.info("Waiting for service dependencies.");
Thread.sleep(15000L);
} catch (InterruptedException e) {
}
}
}
}
相关信息
相关文章
hadoop ApplicationReportSerDeser 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦