hadoop ProviderUtils 源码
haddop ProviderUtils 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
import org.apache.hadoop.yarn.service.utils.PublishedConfigurationOutputter;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_ID;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_INSTANCE_NAME;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME_LC;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.CONTAINER_ID;
/**
* This is a factoring out of methods handy for providers. It's bonded to a log
* at construction time.
*/
public class ProviderUtils implements YarnServiceConstants {
protected static final Logger log =
LoggerFactory.getLogger(ProviderUtils.class);
/**
* Add oneself to the classpath. This does not work
* on minicluster test runs where the JAR is not built up.
* @param providerResources map of provider resources to add these entries to
* @param providerClass provider to add
* @param jarName name of the jar to use
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libdir relative directory to place resources
* @param miniClusterTestRun true if minicluster is being used
* @return true if the class was found in a JAR
*
* @throws FileNotFoundException if the JAR was not found and this is NOT
* a mini cluster test run
* @throws IOException IO problems
* @throws SliderException any Slider problem
*/
public static boolean addProviderJar(
Map<String, LocalResource> providerResources,
Class providerClass,
String jarName,
SliderFileSystem sliderFileSystem,
Path tempPath,
String libdir,
boolean miniClusterTestRun) throws
IOException,
SliderException {
try {
ServiceUtils.putJar(providerResources,
sliderFileSystem,
providerClass,
tempPath,
libdir,
jarName);
return true;
} catch (FileNotFoundException e) {
if (miniClusterTestRun) {
return false;
} else {
throw e;
}
}
}
/**
* Loads all dependency jars from the default path.
* @param providerResources map of provider resources to add these entries to
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
* @param libDir relative directory to place resources
* @param libLocalSrcDir explicitly supplied local libs dir
* @throws IOException trouble copying to HDFS
* @throws SliderException trouble copying to HDFS
*/
public static void addAllDependencyJars(
Map<String, LocalResource> providerResources,
SliderFileSystem sliderFileSystem,
Path tempPath,
String libDir,
String libLocalSrcDir)
throws IOException, SliderException {
if (ServiceUtils.isSet(libLocalSrcDir)) {
File file = new File(libLocalSrcDir);
if (!file.exists() || !file.isDirectory()) {
throw new BadCommandArgumentsException(
"Supplied lib src dir %s is not valid", libLocalSrcDir);
}
}
ServiceUtils.putAllJars(providerResources, sliderFileSystem, tempPath,
libDir, libLocalSrcDir);
}
public static String substituteStrWithTokens(String content,
Map<String, String> tokensForSubstitution) {
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
content =
content.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
return content;
}
public static String replaceSpacesWithDelimiter(String content,
String delimiter) {
List<String> parts = new ArrayList<String>();
Matcher m = Pattern.compile("([^\"]\\S*|\".+?\")\\s*").matcher(content);
while (m.find()) {
String part = m.group(1);
if(part.startsWith("\"") && part.endsWith("\"")) {
part = part.replaceAll("^\"|\"$", "");
}
parts.add(part);
}
return String.join(delimiter, parts);
}
// configs will be substituted by corresponding env in tokenMap
public static void substituteMapWithTokens(Map<String, String> configs,
Map<String, String> tokenMap) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
String value = entry.getValue();
if (tokenMap != null) {
for (Map.Entry<String, String> token : tokenMap.entrySet()) {
value =
value.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
}
entry.setValue(value);
}
}
public static Path initCompInstanceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(),
compLaunchContext.getName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
}
public static Path initCompPublicResourceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
Path compDir = fs.getComponentPublicResourceDir(
compLaunchContext.getServiceVersion(), compLaunchContext.getName());
Path compPublicResourceDir = new Path(compDir,
instance.getCompInstanceName());
return compPublicResourceDir;
}
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public static synchronized void createConfigFileAndAddLocalResource(
AbstractLauncher launcher, SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ServiceContext context, ProviderService.ResolvedLaunchParams
resolvedParams)
throws IOException {
Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info("{} version {} : Creating dir on hdfs: {}",
instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
} else {
log.info("Component instance conf dir already exists: " + compInstanceDir);
}
Path compPublicResourceDir = initCompPublicResourceDir(fs,
compLaunchContext, instance);
if (!fs.getFileSystem().exists(compPublicResourceDir)) {
log.info("{} version {} : Creating Public Resource dir on hdfs: {}",
instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
compPublicResourceDir);
fs.getFileSystem().mkdirs(compPublicResourceDir,
new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE,
FsAction.EXECUTE));
} else {
log.info("Component instance public resource dir already exists: "
+ compPublicResourceDir);
}
log.debug("Tokens substitution for component instance: {}{}{}" + instance
.getCompInstanceName(), System.lineSeparator(), tokensForSubstitution);
for (ConfigFile originalFile : compLaunchContext.getConfiguration()
.getFiles()) {
if (isStaticFile(originalFile)) {
continue;
}
ConfigFile configFile = originalFile.copy();
String fileName = new Path(configFile.getDestFile()).getName();
// substitute file name
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
configFile.setDestFile(configFile.getDestFile()
.replaceAll(Pattern.quote(token.getKey()), token.getValue()));
}
/* When source file is not specified, write new configs
* to compInstanceDir/fileName
* When source file is specified, it reads and performs variable
* substitution and merges in new configs, and writes a new file to
* compInstanceDir/fileName.
*/
Path remoteFile = null;
LocalResourceVisibility visibility = configFile.getVisibility();
if (visibility != null &&
visibility.equals(LocalResourceVisibility.PUBLIC)) {
remoteFile = new Path(compPublicResourceDir, fileName);
} else {
remoteFile = new Path(compInstanceDir, fileName);
}
if (!fs.getFileSystem().exists(remoteFile)) {
log.info("Saving config file on hdfs for component " + instance
.getCompInstanceName() + ": " + configFile);
if (configFile.getSrcFile() != null) {
// Load config file template
switch (configFile.getType()) {
case HADOOP_XML:
// Hadoop_xml_template
resolveHadoopXmlTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, context);
break;
case TEMPLATE:
// plain-template
resolvePlainTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, context);
break;
default:
log.info("Not supporting loading src_file for " + configFile);
break;
}
} else {
// If src_file is not specified
resolvePropsInConfigFileAndSaveOnHdfs(fs, tokensForSubstitution,
instance, configFile, fileName, remoteFile);
}
}
// Add resource for localization
LocalResource configResource =
fs.createAmResource(remoteFile, LocalResourceType.FILE,
configFile.getVisibility());
Path destFile = new Path(configFile.getDestFile());
String symlink = APP_CONF_DIR + "/" + fileName;
addLocalResource(launcher, symlink, configResource, destFile,
resolvedParams);
}
}
public static synchronized void handleStaticFilesForLocalization(
AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService
.ComponentLaunchContext componentLaunchCtx,
ProviderService.ResolvedLaunchParams resolvedParams)
throws IOException {
for (ConfigFile staticFile :
componentLaunchCtx.getConfiguration().getFiles()) {
// Only handle static file here.
if (!isStaticFile(staticFile)) {
continue;
}
if (staticFile.getSrcFile() == null) {
// This should not happen, AbstractClientProvider should have checked
// this.
throw new IOException("srcFile is null, please double check.");
}
Path sourceFile = new Path(staticFile.getSrcFile());
// Output properties to sourceFile if not existed
if (!fs.getFileSystem().exists(sourceFile)) {
throw new IOException(
"srcFile=" + sourceFile + " doesn't exist, please double check.");
}
FileStatus fileStatus = fs.getFileSystem().getFileStatus(sourceFile);
if (fileStatus.isDirectory()) {
throw new IOException("srcFile=" + sourceFile +
" is a directory, which is not supported.");
}
// Add resource for localization
LocalResource localResource = fs.createAmResource(sourceFile,
(staticFile.getType() == ConfigFile.TypeEnum.ARCHIVE ?
LocalResourceType.ARCHIVE :
LocalResourceType.FILE), staticFile.getVisibility());
Path destFile = new Path(sourceFile.getName());
if (staticFile.getDestFile() != null && !staticFile.getDestFile()
.isEmpty()) {
destFile = new Path(staticFile.getDestFile());
}
addLocalResource(launcher, destFile.getName(), localResource, destFile,
resolvedParams);
}
}
private static void addLocalResource(AbstractLauncher launcher,
String symlink, LocalResource localResource, Path destFile,
ProviderService.ResolvedLaunchParams resolvedParams) {
if (destFile.isAbsolute()) {
launcher.addLocalResource(symlink, localResource, destFile.toString());
log.info("Added file for localization: "+ symlink +" -> " +
localResource.getResource().getFile() + ", dest mount path: " +
destFile);
} else{
launcher.addLocalResource(symlink, localResource);
log.info("Added file for localization: " + symlink+ " -> " +
localResource.getResource().getFile());
}
resolvedParams.addResolvedRsrcPath(symlink, destFile.toString());
}
// Static file is files uploaded by users before launch the service. Which
// should be localized to container local disk without any changes.
private static boolean isStaticFile(ConfigFile file) {
return file.getType().equals(ConfigFile.TypeEnum.ARCHIVE) || file.getType()
.equals(ConfigFile.TypeEnum.STATIC);
}
private static void resolvePropsInConfigFileAndSaveOnHdfs(SliderFileSystem fs,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ConfigFile configFile, String fileName, Path remoteFile)
throws IOException {
// substitute non-template configs
substituteMapWithTokens(configFile.getProperties(), tokensForSubstitution);
// write configs onto hdfs
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(fileName,
configFile.getProperties().entrySet());
if (!fs.getFileSystem().exists(remoteFile)) {
PublishedConfigurationOutputter configurationOutputter =
PublishedConfigurationOutputter.createOutputter(
ConfigFormat.resolve(configFile.getType().toString()),
publishedConfiguration);
try (FSDataOutputStream os = fs.getFileSystem().create(remoteFile)) {
configurationOutputter.save(os);
os.flush();
}
} else {
log.info("Component instance = " + instance.getCompInstanceName()
+ ", config file already exists: " + remoteFile);
}
}
// 1. substitute config template - only handle hadoop_xml format
// 2. save on hdfs
@SuppressWarnings("unchecked")
private static void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, ServiceContext context) throws IOException {
Map<String, String> conf;
try {
conf = (Map<String, String>) context.configCache.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// make a copy for substitution
org.apache.hadoop.conf.Configuration confCopy =
new org.apache.hadoop.conf.Configuration(false);
for (Map.Entry<String, String> entry : conf.entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute properties
for (Map.Entry<String, String> entry : configFile.getProperties().entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute env variables
for (Map.Entry<String, String> entry : confCopy) {
String val = entry.getValue();
if (val != null) {
for (Map.Entry<String, String> token : tokensForSubstitution
.entrySet()) {
val = val.replaceAll(Pattern.quote(token.getKey()), token.getValue());
confCopy.set(entry.getKey(), val);
}
}
}
// save on hdfs
try (OutputStream output = fs.create(remoteFile)) {
confCopy.writeXml(output);
log.info("Reading config from: " + configFile.getSrcFile()
+ ", writing to: " + remoteFile);
}
}
// 1) read the template as a string
// 2) do token substitution
// 3) save on hdfs
private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, ServiceContext context) {
String content;
try {
content = (String) context.configCache.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// substitute tokens
content = substituteStrWithTokens(content, tokensForSubstitution);
try (OutputStream output = fs.create(remoteFile)) {
org.apache.commons.io.IOUtils.write(content, output, StandardCharsets.UTF_8);
} catch (IOException e) {
log.info("Failed to create " + remoteFile);
}
}
/**
* Get initial component token map to be substituted into config values.
* @return tokens to replace
*/
public static Map<String, String> initCompTokensForSubstitute(
ComponentInstance instance, Container container,
ContainerLaunchService.ComponentLaunchContext componentLaunchContext) {
Map<String, String> tokens = new HashMap<>();
tokens.put(COMPONENT_NAME, componentLaunchContext.getName());
tokens
.put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase());
tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
tokens.put(CONTAINER_ID, container.getId().toString());
tokens.put(COMPONENT_ID,
String.valueOf(instance.getCompInstanceId().getId()));
tokens.putAll(instance.getComponent().getDependencyHostIpTokens());
return tokens;
}
}
相关信息
相关文章
hadoop AbstractClientProvider 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦