hadoop LogAggregationFileController 源码
haddop LogAggregationFileController 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
/**
* Base class to implement Log Aggregation File Controller.
*/
@Public
@Unstable
public abstract class LogAggregationFileController {
private static final Logger LOG = LoggerFactory.getLogger(
LogAggregationFileController.class);
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
/**
* Umask for the log file.
*/
protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0640 ^ 0777));
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
protected int retentionSize;
protected String fileControllerName;
protected boolean fsSupportsChmod = true;
public LogAggregationFileController() {}
/**
* Initialize the log file controller.
* @param conf the Configuration
* @param controllerName the log controller class name
*/
public void initialize(Configuration conf, String controllerName) {
this.conf = conf;
int configuredRetentionSize = conf.getInt(
YarnConfiguration.NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRetentionSize <= 0) {
this.retentionSize =
YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRetentionSize;
}
this.fileControllerName = controllerName;
extractRemoteRootLogDir();
extractRemoteRootLogDirSuffix();
initInternal(conf);
}
/**
* Derived classes initialize themselves using this method.
* @param conf the Configuration
*/
protected abstract void initInternal(Configuration conf);
/**
* Get the remote root log directory.
* @return the remote root log directory path
*/
public Path getRemoteRootLogDir() {
return this.remoteRootLogDir;
}
/**
* Get the log aggregation directory suffix.
* @return the log aggregation directory suffix
*/
public String getRemoteRootLogDirSuffix() {
return this.remoteRootLogDirSuffix;
}
/**
* Get the name of the file controller.
* @return name of the file controller.
*/
public String getFileControllerName() {
return this.fileControllerName;
}
/**
* Initialize the writer.
* @param context the {@link LogAggregationFileControllerContext}
* @throws IOException if fails to initialize the writer
*/
public abstract void initializeWriter(
LogAggregationFileControllerContext context) throws IOException;
/**
* Close the writer.
* @throws LogAggregationDFSException if the closing of the writer fails
* (for example due to HDFS quota being exceeded)
*/
public abstract void closeWriter() throws LogAggregationDFSException;
/**
* Write the log content.
* @param logKey the log key
* @param logValue the log content
* @throws IOException if fails to write the logs
*/
public abstract void write(LogKey logKey, LogValue logValue)
throws IOException;
/**
* Operations needed after write the log content.
* @param record the {@link LogAggregationFileControllerContext}
* @throws Exception if anything fails
*/
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.cleanupWithLogger(LOG, out);
}
}
/**
* Output container log.
* @param logRequest {@link ContainerLogsRequest}
* @param os the output stream
* @return true if we can read the aggregated logs successfully
* @throws IOException if we can not access the log file.
*/
public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException;
/**
* Return a list of {@link ContainerLogMeta} for an application
* from Remote FileSystem.
*
* @param logRequest {@link ContainerLogsRequest}
* @return a list of {@link ContainerLogMeta}
* @throws IOException if there is no available log file
*/
public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException;
/**
* Returns log file metadata for a node grouped by containers.
*
* @param logRequest extended query information holder
* @param currentNodeFile file status of a node in an application directory
* @param appId id of the application, which is the same as in node path
* @return log file metadata
* @throws IOException if there is no node file
*/
public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
ApplicationId appId) throws IOException {
LOG.info("User aggregated complex log queries " +
"are not implemented for this file controller");
return Collections.emptyMap();
}
/**
* Gets all application directories of a user.
*
* @param user name of the user
* @return a lazy iterator of directories
* @throws IOException if user directory does not exist
*/
public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
String user) throws IOException {
return LogAggregationUtils.getUserRemoteLogDir(
conf, user, getRemoteRootLogDir(), getRemoteRootLogDirSuffix());
}
/**
* Gets all node files in an application directory.
*
* @param appDir application directory
* @return a lazy iterator of files
* @throws IOException if file context is not reachable
*/
public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
FileStatus appDir) throws IOException {
return LogAggregationUtils
.getRemoteFiles(conf, appDir.getPath());
}
/**
* Render Aggregated Logs block.
* @param html the html
* @param context the ViewContext
*/
public abstract void renderAggregatedLogsBlock(Block html,
ViewContext context);
/**
* Returns the owner of the application.
*
* @param aggregatedLogPath the aggregatedLog path
* @param appId the ApplicationId
* @return the application owner
* @throws IOException if we can not get the application owner
*/
public abstract String getApplicationOwner(Path aggregatedLogPath,
ApplicationId appId)
throws IOException;
/**
* Returns ACLs for the application. An empty map is returned if no ACLs are
* found.
*
* @param aggregatedLogPath the aggregatedLog path.
* @param appId the ApplicationId
* @return a map of the Application ACLs.
* @throws IOException if we can not get the application acls
*/
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath, ApplicationId appId) throws IOException;
/**
* Sets the remoteRootLogDirSuffix class variable extracting
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT}
* from the configuration, or
* {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR_SUFFIX} appended by the
* FileController's name, if the former is not set.
*/
private void extractRemoteRootLogDirSuffix() {
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
fileControllerName);
remoteRootLogDirSuffix = conf.get(suffix);
if (remoteRootLogDirSuffix == null
|| remoteRootLogDirSuffix.isEmpty()) {
remoteRootLogDirSuffix = conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ "-" + fileControllerName.toLowerCase();
}
}
/**
* Sets the remoteRootLogDir class variable extracting
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT}
* from the configuration or {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR},
* if the former is not set.
*/
private void extractRemoteRootLogDir() {
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
fileControllerName);
String remoteDir = conf.get(remoteDirStr);
if (remoteDir == null || remoteDir.isEmpty()) {
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
}
remoteRootLogDir = new Path(remoteDir);
}
/**
* Verify and create the remote log directory.
*/
public void verifyAndCreateRemoteLogDir() {
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
remoteFS = getFileSystem(conf);
} catch (IOException e) {
throw new YarnRuntimeException(
"Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = true;
Path remoteRootLogDir = getRemoteRootLogDir();
try {
FsPermission perms =
remoteFS.getFileStatus(remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS)) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
}
} catch (FileNotFoundException e) {
remoteExists = false;
} catch (IOException e) {
throw new YarnRuntimeException(
"Failed to check permissions for dir ["
+ remoteRootLogDir + "]", e);
}
Path qualified =
remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!remoteExists) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
// Not possible to query FileSystem API to check if it supports
// chmod, chown etc. Hence resorting to catching exceptions here.
// Remove when FS APi is ready
try {
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch ( UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this {}", remoteFS.getScheme());
fsSupportsChmod = false;
}
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
String primaryGroupName = conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_GROUPNAME);
if (primaryGroupName == null || primaryGroupName.isEmpty()) {
try {
primaryGroupName = loginUser.getPrimaryGroupName();
} catch (IOException e) {
LOG.warn("No primary group found. The remote root log directory" +
" will be created with the HDFS superuser being its " +
"group owner. JobHistoryServer may be unable to read " +
"the directory.");
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The group of remote root log directory has been " +
"determined by the configuration and set to " +
primaryGroupName);
}
}
// set owner on the remote directory only if the primary group exists
if (primaryGroupName != null) {
try {
remoteFS.setOwner(qualified, loginUser.getShortUserName(),
primaryGroupName);
} catch (UnsupportedOperationException use) {
LOG.info(
"File System does not support setting user/group" + remoteFS
.getScheme(), use);
}
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ remoteRootLogDir + "]", e);
}
} else {
//Check if FS has capability to set/modify permissions
Path permissionCheckFile = new Path(qualified, String.format("%s.permission_check",
RandomStringUtils.randomAlphanumeric(8)));
try {
remoteFS.createNewFile(permissionCheckFile);
remoteFS.setPermission(permissionCheckFile, new FsPermission(TLDIR_PERMISSIONS));
} catch (UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this {}", remoteFS.getScheme());
fsSupportsChmod = false;
} catch (IOException e) {
LOG.warn("Failed to check if FileSystem supports permissions on "
+ "remoteLogDir [" + remoteRootLogDir + "]", e);
} finally {
try {
remoteFS.delete(permissionCheckFile, false);
} catch (IOException ignored) {
}
}
}
}
/**
* Create remote Application directory for log aggregation.
* @param user the user
* @param appId the application ID
* @param userUgi the UGI
*/
public void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
final Path remoteRootLogDir = getRemoteRootLogDir();
final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// TODO: Reuse FS for user?
FileSystem remoteFS = getFileSystem(conf);
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
Path curDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
Path rootLogDir = remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
LinkedList<Path> pathsToCreate = new LinkedList<>();
while (!curDir.equals(rootLogDir)) {
if (!checkExists(remoteFS, curDir, APP_DIR_PERMISSIONS)) {
pathsToCreate.addFirst(curDir);
curDir = curDir.getParent();
} else {
break;
}
}
for (Path path : pathsToCreate) {
createDir(remoteFS, path, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
LOG.error("Failed to setup application log directory for "
+ appId, e);
throw e;
}
return null;
}
});
} catch (Exception e) {
if (e instanceof RemoteException) {
throw new YarnRuntimeException(((RemoteException) e)
.unwrapRemoteException(SecretManager.InvalidToken.class));
}
throw new YarnRuntimeException(e);
}
}
@VisibleForTesting
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return getRemoteRootLogDir().getFileSystem(conf);
}
protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
if (fsSupportsChmod) {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
}
} else {
fs.mkdirs(path);
}
}
protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
boolean exists = true;
try {
FileStatus appDirStatus = fs.getFileStatus(path);
if (fsSupportsChmod) {
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
}
}
} catch (FileNotFoundException fnfe) {
exists = false;
}
return exists;
}
/**
* Get the remote aggregated log path.
* @param appId the ApplicationId
* @param user the Application Owner
* @param nodeId the NodeManager Id
* @return the remote aggregated log path
*/
public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
NodeId nodeId) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
getRemoteRootLogDir(), appId, user, nodeId,
getRemoteRootLogDirSuffix());
}
/**
* Get the remote application directory for log aggregation.
* @param appId the Application ID
* @param appOwner the Application Owner
* @return the remote application directory
* @throws IOException if can not find the remote application directory
*/
public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
throws IOException {
return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
/**
* Get the older remote application directory for log aggregation.
* @param appId the Application ID
* @param appOwner the Application Owner
* @return the older remote application directory
* @throws IOException if can not find the remote application directory
*/
public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner)
throws IOException {
return LogAggregationUtils.getOlderRemoteAppLogDir(conf, appId, appOwner,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
protected void cleanOldLogs(Path remoteNodeLogFileForApp,
final NodeId nodeId, UserGroupInformation userUgi) {
try {
final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
status = status.stream().filter(
next -> next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)).collect(
Collectors.toSet());
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
/**
* Create the aggregated log suffix. The LogAggregationFileController
* should call this to get the suffix and append the suffix to the end
* of each log. This would keep the aggregated log format consistent.
*
* @param fileName the File Name
* @return the aggregated log suffix String
*/
protected String aggregatedLogSuffix(String fileName) {
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileName;
sb.append("\n" + endOfFile + "\n")
.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
return sb.toString();
}
public boolean isFsSupportsChmod() {
return fsSupportsChmod;
}
protected boolean belongsToAppAttempt(ApplicationAttemptId appAttemptId,
String containerIdStr) {
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException exc) {
LOG.warn("Could not parse container id from aggregated log.", exc);
}
if (containerId != null && containerId.getApplicationAttemptId() != null) {
return containerId.getApplicationAttemptId().equals(appAttemptId);
}
return false;
}
}
相关信息
相关文章
hadoop LogAggregationDFSException 源码
hadoop LogAggregationFileControllerContext 源码
hadoop LogAggregationFileControllerFactory 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦