hadoop StagingCommitter 源码

  • 2022-10-20
  • 浏览 (308)

haddop StagingCommitter 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit.staging;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.TaskPool;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;

/**
 * Committer based on the contributed work of the
 * <a href="https://github.com/rdblue/s3committer">Netflix multipart committers.</a>
 * <ol>
 *   <li>
 *   The working directory of each task is actually under a temporary
 *   path in the local filesystem; jobs write directly into it.
 *   </li>
 *   <li>
 *     Task Commit: list all files under the task working dir, upload
 *     each of them but do not commit the final operation.
 *     Persist the information for each pending commit into the cluster
 *     for enumeration and commit by the job committer.
 *   </li>
 *   <li>Task Abort: recursive delete of task working dir.</li>
 *   <li>Job Commit: list all pending PUTs to commit; commit them.</li>
 *   <li>
 *     Job Abort: list all pending PUTs to commit; abort them.
 *     Delete all task attempt directories.
 *   </li>
 * </ol>
 *
 * This is the base class of the Partitioned and Directory committers.
 * It does not do any conflict resolution, and is made non-abstract
 * primarily for test purposes. It is not expected to be used in production.
 */
public class StagingCommitter extends AbstractS3ACommitter {

  private static final Logger LOG = LoggerFactory.getLogger(
      StagingCommitter.class);

  /** Name: {@value}. */
  public static final String NAME = "staging";
  private final Path constructorOutputPath;
  private final long uploadPartSize;
  private final boolean uniqueFilenames;
  private final FileOutputCommitter wrappedCommitter;

  private ConflictResolution conflictResolution;
  private String s3KeyPrefix;

  /** The directory in the cluster FS for commits to go to. */
  private Path commitsDirectory;

  /**
   * Committer for a single task attempt.
   * @param outputPath final output path
   * @param context task context
   * @throws IOException on a failure
   */
  public StagingCommitter(Path outputPath,
      TaskAttemptContext context) throws IOException {
    super(outputPath, context);
    this.constructorOutputPath = requireNonNull(getOutputPath(), "output path");
    Configuration conf = getConf();
    this.uploadPartSize = conf.getLongBytes(
        MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
    this.uniqueFilenames = conf.getBoolean(
        FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
        DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
    setWorkPath(buildWorkPath(context, getUUID()));
    this.wrappedCommitter = createWrappedCommitter(context, conf);
    setOutputPath(constructorOutputPath);
    Path finalOutputPath = requireNonNull(getOutputPath(),
        "Output path cannot be null");
    S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
        context.getConfiguration(), false);
    s3KeyPrefix = fs.pathToKey(finalOutputPath);
    LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
    // forces evaluation and caching of the resolution mode.
    ConflictResolution mode = getConflictResolutionMode(getJobContext(),
        fs.getConf());
    LOG.debug("Conflict resolution mode: {}", mode);
  }

  @Override
  public String getName() {
    return NAME;
  }

  /**
   * Create the wrapped committer.
   * This includes customizing its options, and setting up the destination
   * directory.
   * @param context job/task context.
   * @param conf config
   * @return the inner committer
   * @throws IOException on a failure
   */
  protected FileOutputCommitter createWrappedCommitter(JobContext context,
      Configuration conf) throws IOException {

    // explicitly choose commit algorithm
    initFileOutputCommitterOptions(context);
    commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf,
        getUUID());
    return new FileOutputCommitter(commitsDirectory, context);
  }

  /**
   * Init the context config with everything needed for the file output
   * committer. In particular, this code currently only works with
   * commit algorithm 1.
   * @param context context to configure.
   */
  protected void initFileOutputCommitterOptions(JobContext context) {
    context.getConfiguration()
        .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder("StagingCommitter{");
    sb.append(super.toString());
    sb.append(", commitsDirectory=").append(commitsDirectory);
    sb.append(", uniqueFilenames=").append(uniqueFilenames);
    sb.append(", conflictResolution=").append(conflictResolution);
    sb.append(", uploadPartSize=").append(uploadPartSize);
    if (wrappedCommitter != null) {
      sb.append(", wrappedCommitter=").append(wrappedCommitter);
    }
    sb.append('}');
    return sb.toString();
  }

  /**
   * Get the work path for a task.
   * @param context job/task complex
   * @param uuid UUID
   * @return a path or null if the context is not of a task
   * @throws IOException failure to build the path
   */
  private static Path buildWorkPath(JobContext context, String uuid)
      throws IOException {
    if (context instanceof TaskAttemptContext) {
      return taskAttemptWorkingPath((TaskAttemptContext) context, uuid);
    } else {
      return null;
    }
  }

  /**
   * Is this committer using unique filenames?
   * @return true if unique filenames are used.
   */
  public Boolean useUniqueFilenames() {
    return uniqueFilenames;
  }

  /**
   * Get the filesystem for the job attempt.
   * @param context the context of the job.  This is used to get the
   * application attempt ID.
   * @return the FS to store job attempt data.
   * @throws IOException failure to create the FS.
   */
  public FileSystem getJobAttemptFileSystem(JobContext context)
      throws IOException {
    Path p = getJobAttemptPath(context);
    return p.getFileSystem(context.getConfiguration());
  }

  /**
   * Compute the path where the output of a given job attempt will be placed.
   * @param context the context of the job.  This is used to get the
   * application attempt ID.
   * @param out the output path to place these in.
   * @return the path to store job attempt data.
   */
  public static Path getJobAttemptPath(JobContext context, Path out) {
    return getJobAttemptPath(getAppAttemptId(context), out);
  }

  /**
   * Compute the path where the output of a given job attempt will be placed.
   * @param appAttemptId the ID of the application attempt for this job.
   * @return the path to store job attempt data.
   */
  private static Path getJobAttemptPath(int appAttemptId, Path out) {
    return new Path(getPendingJobAttemptsPath(out),
        String.valueOf(appAttemptId));
  }

  @Override
  protected Path getJobAttemptPath(int appAttemptId) {
    return new Path(getJobPath(),
        String.valueOf(appAttemptId));
  }

  /**
   * Compute the path under which all job attempts will be placed.
   * @return the path to store job attempt data.
   */
  protected Path getJobPath() {
    return getPendingJobAttemptsPath(commitsDirectory);
  }

  /**
   * Compute the path where the output of pending task attempts are stored.
   * @param context the context of the job with pending tasks.
   * @return the path where the output of pending task attempts are stored.
   */
  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
    return new Path(getJobAttemptPath(context, out), TEMPORARY);
  }

  /**
   * Compute the path where the output of a task attempt is stored until
   * that task is committed.
   *
   * @param context the context of the task attempt.
   * @param out The output path to put things in.
   * @return the path where a task attempt should be stored.
   */
  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
    return new Path(getPendingTaskAttemptsPath(context, out),
        String.valueOf(context.getTaskAttemptID()));
  }

  /**
   * Get the location of pending job attempts.
   * @param out the base output directory.
   * @return the location of pending job attempts.
   */
  private static Path getPendingJobAttemptsPath(Path out) {
    requireNonNull(out, "Null 'out' path");
    return new Path(out, TEMPORARY);
  }

  /**
   * Compute the path where the output of a committed task is stored until
   * the entire job is committed.
   * @param context the context of the task attempt
   * @return the path where the output of a committed task is stored until
   * the entire job is committed.
   */
  public Path getCommittedTaskPath(TaskAttemptContext context) {
    return getCommittedTaskPath(getAppAttemptId(context), context);
  }

  /**
   * Validate the task attempt context; makes sure
   * that the task attempt ID data is valid.
   * @param context task context
   */
  private static void validateContext(TaskAttemptContext context) {
    requireNonNull(context, "null context");
    requireNonNull(context.getTaskAttemptID(),
        "null task attempt ID");
    requireNonNull(context.getTaskAttemptID().getTaskID(),
        "null task ID");
    requireNonNull(context.getTaskAttemptID().getJobID(),
        "null job ID");
  }

  /**
   * Compute the path where the output of a committed task is stored until the
   * entire job is committed for a specific application attempt.
   * @param appAttemptId the ID of the application attempt to use
   * @param context the context of any task.
   * @return the path where the output of a committed task is stored.
   */
  protected Path getCommittedTaskPath(int appAttemptId,
      TaskAttemptContext context) {
    validateContext(context);
    return new Path(getJobAttemptPath(appAttemptId),
        String.valueOf(context.getTaskAttemptID().getTaskID()));
  }

  @Override
  public Path getTempTaskAttemptPath(TaskAttemptContext context) {
    throw new UnsupportedOperationException("Unimplemented");
  }

  /**
   * Lists the output of a task under the task attempt path. Subclasses can
   * override this method to change how output files are identified.
   * <p>
   * This implementation lists the files that are direct children of the output
   * path and filters hidden files (file names starting with '.' or '_').
   * <p>
   * The task attempt path is provided by
   * {@link #getTaskAttemptPath(TaskAttemptContext)}
   *
   * @param context this task's {@link TaskAttemptContext}
   * @return the output files produced by this task in the task attempt path
   * @throws IOException on a failure
   */
  protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
      throws IOException {

    // get files on the local FS in the attempt path
    Path attemptPath = requireNonNull(getTaskAttemptPath(context),
        "No attemptPath path");

    LOG.debug("Scanning {} for files to commit", attemptPath);

    return toList(listAndFilter(getTaskAttemptFilesystem(context),
        attemptPath, true, HIDDEN_FILE_FILTER));
  }

  /**
   * Returns the final S3 key for a relative path. Subclasses can override this
   * method to upload files to a different S3 location.
   * <p>
   * This implementation concatenates the relative path with the key prefix
   * from the output path.
   * If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} is
   * set, then the task UUID is also included in the calculation
   *
   * @param relative the path of a file relative to the task attempt path
   * @param context the JobContext or TaskAttemptContext for this job
   * @return the S3 key where the file will be uploaded
   */
  protected String getFinalKey(String relative, JobContext context) {
    if (uniqueFilenames) {
      return getS3KeyPrefix(context) + "/"
          + Paths.addUUID(relative, getUUID());
    } else {
      return getS3KeyPrefix(context) + "/" + relative;
    }
  }

  /**
   * Returns the final S3 location for a relative path as a Hadoop {@link Path}.
   * This is a final method that calls {@link #getFinalKey(String, JobContext)}
   * to determine the final location.
   *
   * @param relative the path of a file relative to the task attempt path
   * @param context the JobContext or TaskAttemptContext for this job
   * @return the S3 Path where the file will be uploaded
   * @throws IOException IO problem
   */
  protected final Path getFinalPath(String relative, JobContext context)
      throws IOException {
    return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, context));
  }

  /**
   * Return the local work path as the destination for writing work.
   * @param context the context of the task attempt.
   * @return a path in the local filesystem.
   */
  @Override
  public Path getBaseTaskAttemptPath(TaskAttemptContext context) {
    // a path on the local FS for files that will be uploaded
    return getWorkPath();
  }

  /**
   * For a job attempt path, the staging committer returns that of the
   * wrapped committer.
   * @param context the context of the job.
   * @return a path in HDFS.
   */
  @Override
  public Path getJobAttemptPath(JobContext context) {
    return wrappedCommitter.getJobAttemptPath(context);
  }

  /**
   * Set up the job, including calling the same method on the
   * wrapped committer.
   * @param context job context
   * @throws IOException IO failure.
   */
  @Override
  public void setupJob(JobContext context) throws IOException {
    super.setupJob(context);
    wrappedCommitter.setupJob(context);
  }

  /**
   * Get the list of pending uploads for this job attempt.
   * @param commitContext job context
   * @return a list of pending uploads.
   * @throws IOException Any IO failure
   */
  @Override
  protected ActiveCommit listPendingUploadsToCommit(
      CommitContext commitContext)
      throws IOException {
    return listPendingUploads(commitContext, false);
  }

  /**
   * Get the list of pending uploads for this job attempt, swallowing
   * exceptions.
   * @param commitContext commit context
   * @return a list of pending uploads. If an exception was swallowed,
   * then this may not match the actual set of pending operations
   * @throws IOException shouldn't be raised, but retained for the compiler
   */
  protected ActiveCommit listPendingUploadsToAbort(
      CommitContext commitContext) throws IOException {
    return listPendingUploads(commitContext, true);
  }

  /**
   * Get the list of pending uploads for this job attempt.
   * @param commitContext commit context
   * @param suppressExceptions should exceptions be swallowed?
   * @return a list of pending uploads. If exceptions are being swallowed,
   * then this may not match the actual set of pending operations
   * @throws IOException Any IO failure which wasn't swallowed.
   */
  protected ActiveCommit listPendingUploads(
      CommitContext commitContext, boolean suppressExceptions) throws IOException {
    try (DurationInfo ignored = new DurationInfo(LOG,
        "Listing pending uploads")) {
      Path wrappedJobAttemptPath = getJobAttemptPath(commitContext.getJobContext());
      final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
          commitContext.getConf());
      return ActiveCommit.fromStatusIterator(attemptFS,
          listAndFilter(attemptFS,
              wrappedJobAttemptPath, false,
              HIDDEN_FILE_FILTER));
    } catch (FileNotFoundException e) {
      // this can mean the job was aborted early on, so don't confuse people
      // with long stack traces that aren't the underlying problem.
      maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
    } catch (IOException e) {
      // unable to work with endpoint, if suppressing errors decide our actions
      maybeIgnore(suppressExceptions, "Listing pending uploads", e);
    }
    // reached iff an IOE was caught and swallowed
    return ActiveCommit.empty();
  }

  @Override
  public void cleanupStagingDirs() {
    Path workPath = getWorkPath();
    if (workPath != null) {
      LOG.debug("Cleaning up work path {}", workPath);
      ignoreIOExceptions(LOG, "cleaning up", workPath.toString(),
          () -> deleteQuietly(workPath.getFileSystem(getConf()),
              workPath, true));
    }
  }

  /**
   * Staging committer cleanup includes calling wrapped committer's
   * cleanup method, and removing all destination paths in the final
   * filesystem.
   * @param commitContext commit context
   * @param suppressExceptions should exceptions be suppressed?
   * @throws IOException IO failures if exceptions are not suppressed.
   */
  @Override
  @SuppressWarnings("deprecation")
  protected void cleanup(CommitContext commitContext,
      boolean suppressExceptions)
      throws IOException {
    maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
        () -> wrappedCommitter.cleanupJob(
            commitContext.getJobContext()));
    maybeIgnore(suppressExceptions, "Delete destination paths",
        () -> deleteDestinationPaths(
            commitContext.getJobContext()));
    super.cleanup(commitContext, suppressExceptions);
  }

  @Override
  protected void abortJobInternal(CommitContext commitContext,
      boolean suppressExceptions) throws IOException {
    String r = getRole();
    JobContext context = commitContext.getJobContext();
    boolean failed = false;
    try (DurationInfo d = new DurationInfo(LOG,
        "%s: aborting job in state %s ", r, jobIdString(context))) {
      ActiveCommit pending = listPendingUploadsToAbort(commitContext);
      abortPendingUploads(commitContext,
          pending, suppressExceptions, true);
    } catch (FileNotFoundException e) {
      // nothing to list
      LOG.debug("No job directory to read uploads from");
    } catch (IOException e) {
      failed = true;
      maybeIgnore(suppressExceptions, "aborting job", e);
    } finally {
      super.abortJobInternal(commitContext, failed || suppressExceptions);
    }
  }


  /**
   * Delete the working paths of a job.
   * <ol>
   *   <li>The job attempt path</li>
   *   <li>{@code $dest/__temporary}</li>
   *   <li>the local working directory for staged files</li>
   * </ol>
   * Does not attempt to clean up the work of the wrapped committer.
   * @param context job context
   * @throws IOException IO failure
   */
  protected void deleteDestinationPaths(JobContext context) throws IOException {
    Path attemptPath = getJobAttemptPath(context);
    ignoreIOExceptions(LOG,
        "Deleting Job attempt Path", attemptPath.toString(),
        () -> deleteWithWarning(
            getJobAttemptFileSystem(context),
            attemptPath,
            true));

    // delete the __temporary directory. This will cause problems
    // if there is >1 task targeting the same dest dir
    deleteWithWarning(getDestFS(),
        new Path(getOutputPath(), TEMPORARY),
        true);
    // and the working path
    deleteTaskWorkingPathQuietly(context);
  }


  @Override
  public void setupTask(TaskAttemptContext context) throws IOException {
    Path taskAttemptPath = getTaskAttemptPath(context);
    try (DurationInfo d = new DurationInfo(LOG,
        "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
      super.setupTask(context);
      wrappedCommitter.setupTask(context);
    }
  }

  @Override
  public boolean needsTaskCommit(TaskAttemptContext context)
      throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "%s: needsTaskCommit() Task %s",
        getRole(), context.getTaskAttemptID())) {
      // check for files on the local FS in the attempt path
      Path attemptPath = getTaskAttemptPath(context);
      FileSystem fs = getTaskAttemptFilesystem(context);

      // This could be made more efficient with a probe "hasChildren(Path)"
      // which returns true if there is >1 entry under a given path.
      FileStatus[] stats = fs.listStatus(attemptPath);
      LOG.debug("{} files to commit under {}", stats.length, attemptPath);
      return stats.length > 0;
    } catch (FileNotFoundException e) {
      // list didn't find a directory, so nothing to commit
      // TODO: throw this up as an error?
      LOG.info("No files to commit");
      throw e;
    }
  }

  @Override
  public void commitTask(TaskAttemptContext context) throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "%s: commit task %s", getRole(), context.getTaskAttemptID());
         CommitContext commitContext
             = initiateTaskOperation(context)) {
      int count = commitTaskInternal(context, getTaskOutput(context), commitContext);
      LOG.info("{}: upload file count: {}", getRole(), count);
    } catch (IOException e) {
      LOG.error("{}: commit of task {} failed",
          getRole(), context.getTaskAttemptID(), e);
      getCommitOperations().taskCompleted(false);
      throw e;
    }
    getCommitOperations().taskCompleted(true);
  }

  /**
   * Commit the task by uploading all created files and then
   * writing a pending entry for them.
   * @param context task context
   * @param taskOutput list of files from the output
   * @param commitContext commit context
   * @return number of uploads committed.
   * @throws IOException IO Failures.
   */
  protected int commitTaskInternal(final TaskAttemptContext context,
      List<? extends FileStatus> taskOutput,
      CommitContext commitContext)
      throws IOException {
    LOG.debug("{}: commitTaskInternal", getRole());
    Configuration conf = context.getConfiguration();

    final Path attemptPath = getTaskAttemptPath(context);
    FileSystem attemptFS = getTaskAttemptFilesystem(context);
    LOG.debug("{}: attempt path is {}", getRole(), attemptPath);

    // add the commits file to the wrapped committer's task attempt location.
    // of this method.
    Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context);
    FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf);

    // keep track of unfinished commits in case one fails. if something fails,
    // we will try to abort the ones that had already succeeded.
    int commitCount = taskOutput.size();
    final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
    LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
        attemptPath);
    LOG.info("{}: Saving pending data information to {}",
        getRole(), commitsAttemptPath);
    if (taskOutput.isEmpty()) {
      // there is nothing to write. needsTaskCommit() should have caught
      // this, so warn that there is some kind of problem in the protocol.
      LOG.warn("{}: No files to commit", getRole());
    } else {
      boolean threw = true;
      // before the uploads, report some progress
      context.progress();

      PendingSet pendingCommits = new PendingSet(commitCount);
      pendingCommits.putExtraData(TASK_ATTEMPT_ID,
          context.getTaskAttemptID().toString());
      try {
        TaskPool.foreach(taskOutput)
            .stopOnFailure()
            .suppressExceptions(false)
            .executeWith(commitContext.getOuterSubmitter())
            .run(stat -> {
              Path path = stat.getPath();
              File localFile = new File(path.toUri().getPath());
              String relative = Paths.getRelativePath(attemptPath, path);
              String partition = Paths.getPartition(relative);
              String key = getFinalKey(relative, context);
              Path destPath = getDestS3AFS().keyToQualifiedPath(key);
              SinglePendingCommit commit = getCommitOperations()
                  .uploadFileToPendingCommit(
                      localFile,
                      destPath,
                      partition,
                      uploadPartSize,
                      context);
              LOG.debug("{}: adding pending commit {}", getRole(), commit);
              commits.add(commit);
            });

        for (SinglePendingCommit commit : commits) {
          pendingCommits.add(commit);
        }

        // maybe add in the IOStatistics the thread
        if (commitContext.isCollectIOStatistics()) {
          pendingCommits.getIOStatistics().aggregate(
              commitContext.getIOStatisticsContext()
              .getIOStatistics());
        }

        // save the data
        // overwrite any existing file, so whichever task attempt
        // committed last wins.

        LOG.debug("Saving {} pending commit(s)) to file {}",
            pendingCommits.size(),
            commitsAttemptPath);
        pendingCommits.save(commitsFS, commitsAttemptPath,
            commitContext.getPendingSetSerializer());
        threw = false;

      } finally {
        if (threw) {
          LOG.error(
              "{}: Exception during commit process, aborting {} commit(s)",
              getRole(), commits.size());
          try(DurationInfo ignored = new DurationInfo(LOG,
                  "Aborting %s uploads", commits.size())) {
            TaskPool.foreach(commits)
                .suppressExceptions()
                .executeWith(commitContext.getOuterSubmitter())
                .run(commitContext::abortSingleCommit);
          }
          deleteTaskAttemptPathQuietly(context);
        }
      }
      // always purge attempt information at this point.
      Paths.clearTempFolderInfo(context.getTaskAttemptID());
    }

    LOG.debug("Committing wrapped task");
    wrappedCommitter.commitTask(context);

    LOG.debug("Cleaning up attempt dir {}", attemptPath);
    attemptFS.delete(attemptPath, true);
    return commits.size();
  }

  /**
   * Abort the task.
   * The API specifies that the task has not yet been committed, so there are
   * no uploads that need to be cancelled.
   * Accordingly just delete files on the local FS, and call abortTask in
   * the wrapped committer.
   * <b>Important: this may be called in the AM after a container failure.</b>
   * When that occurs and the failed container was on a different host in the
   * cluster, the local files will not be deleted.
   * @param context task context
   * @throws IOException any failure
   */
  @Override
  public void abortTask(TaskAttemptContext context) throws IOException {
    // the API specifies that the task has not yet been committed, so there are
    // no uploads that need to be cancelled. just delete files on the local FS.
    try (DurationInfo d = new DurationInfo(LOG,
        "Abort task %s", context.getTaskAttemptID())) {
      deleteTaskAttemptPathQuietly(context);
      deleteTaskWorkingPathQuietly(context);
      wrappedCommitter.abortTask(context);
    } catch (IOException e) {
      LOG.error("{}: exception when aborting task {}",
          getRole(), context.getTaskAttemptID(), e);
      throw e;
    }
  }

  /**
   * Get the work path for a task.
   * @param context job/task complex
   * @param uuid UUID
   * @return a path
   * @throws IOException failure to build the path
   */
  private static Path taskAttemptWorkingPath(TaskAttemptContext context,
      String uuid) throws IOException {
    return getTaskAttemptPath(context,
        Paths.getLocalTaskAttemptTempDir(
            context.getConfiguration(),
            uuid,
            context.getTaskAttemptID()));
  }

  /**
   * Delete the working path of a task; no-op if there is none, that
   * is: this is a job.
   * @param context job/task context
   */
  protected void deleteTaskWorkingPathQuietly(JobContext context) {
    ignoreIOExceptions(LOG, "Delete working path", "",
        () -> {
          Path path = buildWorkPath(context, getUUID());
          if (path != null) {
            deleteQuietly(path.getFileSystem(getConf()), path, true);
          }
        });
  }

  /**
   * Get the key of the destination "directory" of the job/task.
   * @param context job context
   * @return key to write to
   */
  private String getS3KeyPrefix(JobContext context) {
    return s3KeyPrefix;
  }

  /**
   * Returns the {@link ConflictResolution} mode for this commit.
   *
   * @param context the JobContext for this commit
   * @param fsConf filesystem config
   * @return the ConflictResolution mode
   */
  public final ConflictResolution getConflictResolutionMode(
      JobContext context,
      Configuration fsConf) {
    if (conflictResolution == null) {
      this.conflictResolution = ConflictResolution.valueOf(
          getConfictModeOption(context, fsConf, DEFAULT_CONFLICT_MODE));
    }
    return conflictResolution;
  }

  /**
   * Generate a {@link PathExistsException} because the destination exists.
   * Lists some of the child entries first, to help diagnose the problem.
   * @param path path which exists
   * @param description description (usually task/job ID)
   * @return an exception to throw
   */
  protected PathExistsException failDestinationExists(final Path path,
      final String description) {

    LOG.error("{}: Failing commit by job {} to write"
            + " to existing output path {}.",
        description,
        getJobContext().getJobID(), path);
    // List the first 10 descendants, to give some details
    // on what is wrong but not overload things if there are many files.
    try {
      int limit = 10;
      RemoteIterator<LocatedFileStatus> lf
          = getDestFS().listFiles(path, true);
      LOG.info("Partial Directory listing");
      while (limit > 0 && lf.hasNext()) {
        limit--;
        LocatedFileStatus status = lf.next();
        LOG.info("{}: {}",
            status.getPath(),
            status.isDirectory()
                ? " dir"
                : ("file size " + status.getLen() + " bytes"));
      }
      cleanupRemoteIterator(lf);
    } catch (IOException e) {
      LOG.info("Discarding exception raised when listing {}: " + e, path);
      LOG.debug("stack trace ", e);
    }
    return new PathExistsException(path.toString(),
        description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
  }

  /**
   * Get the conflict mode option string.
   * @param context context with the config
   * @param fsConf filesystem config
   * @param defVal default value.
   * @return the trimmed configuration option, upper case.
   */
  public static String getConfictModeOption(JobContext context,
      Configuration fsConf, String defVal) {
    return getConfigurationOption(context,
        fsConf,
        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
        defVal).toUpperCase(Locale.ENGLISH);
  }

  /**
   * Pre-commit actions for a job.
   * Loads all the pending files to verify they can be loaded
   * and parsed.
   * @param commitContext commit context
   * @param pending pending commits
   * @throws IOException any failure
   */
  @Override
  public void preCommitJob(
      CommitContext commitContext,
      final ActiveCommit pending) throws IOException {

    // see if the files can be loaded.
    precommitCheckPendingFiles(commitContext, pending);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConflictResolution 源码

hadoop DirectoryStagingCommitter 源码

hadoop DirectoryStagingCommitterFactory 源码

hadoop PartitionedStagingCommitter 源码

hadoop PartitionedStagingCommitterFactory 源码

hadoop Paths 源码

hadoop StagingCommitterConstants 源码

hadoop StagingCommitterFactory 源码

hadoop package-info 源码

0  赞