hadoop AbstractJobOrTaskStage 源码
haddop AbstractJobOrTaskStage 代码
文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.util.OperationDuration;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
/**
 * A Stage in Task/Job Commit.
 * A stage can be executed once only, creating the return value of the
 * {@link #apply(Object)} method, and, potentially, updating the state of the
 * store via {@link ManifestStoreOperations}.
 * IOStatistics will also be updated.
 * Stages are expected to be combined to form the commit protocol.
 * @param <IN> Type of arguments to the stage.
 * @param <OUT> Type of result.
 */
public abstract class AbstractJobOrTaskStage<IN, OUT>
    implements JobOrTaskStage<IN, OUT> {
  private static final Logger LOG = LoggerFactory.getLogger(
      AbstractJobOrTaskStage.class);
  /**
   * Error text on rename failure: {@value}.
   */
  public static final String FAILED_TO_RENAME_PREFIX = "Failed to ";
  /**
   * Is this a task stage? If so, toString() includes task
   * info..
   */
  private final boolean isTaskStage;
  /**
   * Configuration of all the stages in the ongoing committer
   * operation.
   */
  private final StageConfig stageConfig;
  /**
   * Name of the stage for statistics and logging.
   */
  private final String stageStatisticName;
  /**
   * Callbacks to update store.
   * This is not made visible to the stages; they must
   * go through the wrapper classes in this class, which
   * add statistics and logging.
   */
  private final ManifestStoreOperations operations;
  /**
   * Submitter for doing IO against the store.
   */
  private final TaskPool.Submitter ioProcessors;
  /**
   * Used to stop any re-entrancy of the rename.
   * This is an execute-once operation.
   */
  private final AtomicBoolean executed = new AtomicBoolean(false);
  /**
   * Tracker of the duration of the execution of the stage.
   * set after {@link #executeStage(Object)} completes.
   */
  private DurationTracker stageExecutionTracker;
  /**
   * Name for logging.
   */
  private final String name;
  /**
   * Constructor.
   * @param isTaskStage Is this a task stage?
   * @param stageConfig stage-independent configuration.
   * @param stageStatisticName name of the stage for statistics/logging
   * @param requireIOProcessors are the IO processors required?
   */
  protected AbstractJobOrTaskStage(
      final boolean isTaskStage,
      final StageConfig stageConfig,
      final String stageStatisticName,
      final boolean requireIOProcessors) {
    this.isTaskStage = isTaskStage;
    this.stageStatisticName = stageStatisticName;
    this.stageConfig = stageConfig;
    requireNonNull(stageConfig.getDestinationDir(), "Destination Directory");
    requireNonNull(stageConfig.getJobId(), "Job ID");
    requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory");
    this.operations = requireNonNull(stageConfig.getOperations(),
        "Operations callbacks");
    // and the processors of work if required.
    this.ioProcessors = bindProcessor(
        requireIOProcessors,
        stageConfig.getIoProcessors());
    String stageName;
    if (isTaskStage) {
      // force fast failure.
      getRequiredTaskId();
      getRequiredTaskAttemptId();
      getRequiredTaskAttemptDir();
      stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId());
    } else  {
      stageName = String.format("[Job-Attempt %s/%02d]",
          stageConfig.getJobId(),
          stageConfig.getJobAttemptNumber());
    }
    name = stageName;
  }
  /**
   * Bind to the processor if it is required.
   * @param required is the processor required?
   * @param processor processor
   * @return the processor binding
   * @throws NullPointerException if required == true and processor is null.
   */
  private TaskPool.Submitter bindProcessor(
      final boolean required,
      final TaskPool.Submitter processor) {
    return required
        ? requireNonNull(processor, "required IO processor is null")
        : null;
  }
  /**
   * Stage entry point.
   * Verifies that this is the first and only time the stage is invoked,
   * then calls {@link #executeStage(Object)} for the subclass
   * to perform its part of the commit protocol.
   * The duration of the stage is collected as a statistic, and its
   * entry/exit logged at INFO.
   * @param arguments arguments to the function.
   * @return the result.
   * @throws IOException failures.
   */
  @Override
  public final OUT apply(final IN arguments) throws IOException {
    executeOnlyOnce();
    progress();
    String stageName = getStageName(arguments);
    getStageConfig().enterStage(stageName);
    String statisticName = getStageStatisticName(arguments);
    // The tracker here
    LOG.info("{}: Executing Stage {}", getName(), stageName);
    stageExecutionTracker = createTracker(getIOStatistics(), statisticName);
    try {
      // exec the input function and return its value
      final OUT out = executeStage(arguments);
      LOG.info("{}: Stage {} completed after {}",
          getName(),
          stageName,
          OperationDuration.humanTime(
              stageExecutionTracker.asDuration().toMillis()));
      return out;
    } catch (IOException | RuntimeException e) {
      LOG.error("{}: Stage {} failed: after {}: {}",
          getName(),
          stageName,
          OperationDuration.humanTime(
              stageExecutionTracker.asDuration().toMillis()),
          e.toString());
      LOG.debug("{}: Stage failure:", getName(), e);
      // input function failed: note it
      stageExecutionTracker.failed();
      // and rethrow
      throw e;
    } finally {
      // update the tracker.
      // this is called after the catch() call would have
      // set the failed flag.
      stageExecutionTracker.close();
      progress();
      getStageConfig().exitStage(stageName);
    }
  }
  /**
   * The work of a stage.
   * Executed exactly once.
   * @param arguments arguments to the function.
   * @return the result.
   * @throws IOException failures.
   */
  protected abstract OUT executeStage(IN arguments) throws IOException;
  /**
   * Check that the operation has not been invoked twice.
   * This is an atomic check.
   * @throws IllegalStateException on a second invocation.
   */
  private void executeOnlyOnce() {
    Preconditions.checkState(
        !executed.getAndSet(true),
        "Stage attempted twice");
  }
  /**
   * The stage statistic name.
   * @param arguments args to the invocation.
   * @return stage name.
   */
  protected String getStageStatisticName(IN arguments) {
    return stageStatisticName;
  }
  /**
   * Stage name for reporting; defaults to
   * call {@link #getStageStatisticName(IN)}.
   * @param arguments args to the invocation.
   * @return name used in updating reports.
   */
  protected String getStageName(IN arguments) {
    return getStageStatisticName(arguments);
  }
  /**
   * Get the execution tracker; non-null
   * after stage execution.
   * @return a tracker or null.
   */
  public DurationTracker getStageExecutionTracker() {
    return stageExecutionTracker;
  }
  /**
   * Adds the duration of the job to an IOStatistics store
   * (such as the manifest to be saved).
   * @param iostats store
   * @param statistic statistic name.
   */
  public void addExecutionDurationToStatistics(IOStatisticsStore iostats,
      String statistic) {
    iostats.addTimedOperation(
        statistic,
        getStageExecutionTracker().asDuration());
  }
  /**
   * Note any rate limiting to the given timing statistic.
   * If the wait was 0, no statistics are updated.
   * @param statistic statistic key.
   * @param wait wait duration.
   */
  private void noteAnyRateLimiting(String statistic, Duration wait) {
    if (!wait.isZero()) {
      // rate limiting took place
      getIOStatistics().addTimedOperation(
          statistic,
          wait.toMillis());
    }
  }
  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "AbstractJobOrTaskStage{");
    sb.append(isTaskStage ? "Task Stage" : "Job Stage");
    sb.append(" name='").append(name).append('\'');
    sb.append(" stage='").append(stageStatisticName).append('\'');
    sb.append('}');
    return sb.toString();
  }
  /**
   * The stage configuration.
   * @return the stage configuration used by this stage.
   */
  protected StageConfig getStageConfig() {
    return stageConfig;
  }
  /**
   * Update the thread context with the stage name and
   * job ID.
   * This MUST be invoked at the start of methods invoked in helper threads,
   * to ensure that they are all annotated with job and stage.
   * @param stage stage name.
   */
  protected void updateAuditContext(final String stage) {
    enterStageWorker(stageConfig.getJobId(), stage);
  }
  /**
   * The IOStatistics are shared across all uses of the
   * StageConfig.
   * @return the (possibly shared) IOStatistics.
   */
  @Override
  public final IOStatisticsStore getIOStatistics() {
    return stageConfig.getIOStatistics();
  }
  /**
   * Call progress() on any Progressable passed in.
   */
  protected final void progress() {
    if (stageConfig.getProgressable() != null) {
      stageConfig.getProgressable().progress();
    }
  }
  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final FileStatus getFileStatusOrNull(
      final Path path)
      throws IOException {
    try {
      return getFileStatus(path);
    } catch (FileNotFoundException e) {
      return null;
    }
  }
  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final FileStatus getFileStatus(
      final Path path)
      throws IOException {
    LOG.trace("{}: getFileStatus('{}')", getName(), path);
    requireNonNull(path,
        () -> String.format("%s: Null path for getFileStatus() call", getName()));
    return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () ->
        operations.getFileStatus(path));
  }
  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return true if the path resolves to a file
   * @throws IOException IO Failure.
   */
  protected final boolean isFile(
      final Path path)
      throws IOException {
    LOG.trace("{}: isFile('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_IS_FILE, () -> {
      return operations.isFile(path);
    });
  }
  /**
   * Delete a path.
   * @param path path
   * @param recursive recursive delete.
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final boolean delete(
      final Path path,
      final boolean recursive)
      throws IOException {
    LOG.trace("{}: delete('{}, {}')", getName(), path, recursive);
    return delete(path, recursive, OP_DELETE);
  }
  /**
   * Delete a path.
   * @param path path
   * @param recursive recursive delete.
   * @param statistic statistic to update
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected Boolean delete(
      final Path path,
      final boolean recursive,
      final String statistic)
      throws IOException {
    return trackDuration(getIOStatistics(), statistic, () -> {
      return operations.delete(path, recursive);
    });
  }
  /**
   * Create a directory.
   * @param path path
   * @param escalateFailure escalate "false" to PathIOE
   * @return true if the directory was created/exists.
   * @throws IOException IO Failure.
   */
  protected final boolean mkdirs(
      final Path path,
      final boolean escalateFailure)
      throws IOException {
    LOG.trace("{}: mkdirs('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_MKDIRS, () -> {
      boolean success = operations.mkdirs(path);
      if (!success && escalateFailure) {
        throw new PathIOException(path.toUri().toString(),
            stageStatisticName + ": mkdirs() returned false");
      }
      return success;
    });
  }
  /**
   * List all directly files under a path.
   * Async implementations may under-report their durations.
   * @param path path
   * @return iterator over the results.
   * @throws IOException IO Failure.
   */
  protected final RemoteIterator<FileStatus> listStatusIterator(
      final Path path)
      throws IOException {
    LOG.trace("{}: listStatusIterator('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_LIST_STATUS, () ->
        operations.listStatusIterator(path));
  }
  /**
   * Load a manifest file.
   * @param status source.
   * @return the manifest.
   * @throws IOException IO Failure.
   */
  protected final TaskManifest loadManifest(
      final FileStatus status)
      throws IOException {
    LOG.trace("{}: loadManifest('{}')", getName(), status);
    return trackDuration(getIOStatistics(), OP_LOAD_MANIFEST, () ->
        operations.loadTaskManifest(
            stageConfig.currentManifestSerializer(),
            status));
  }
  /**
   * List all the manifests in the task manifest dir.
   * @return a iterator of manifests.
   * @throws IOException IO Failure.
   */
  protected final RemoteIterator<FileStatus> listManifests()
      throws IOException {
    return RemoteIterators.filteringRemoteIterator(
        listStatusIterator(getTaskManifestDir()),
        st -> st.getPath().toUri().toString().endsWith(MANIFEST_SUFFIX));
  }
  /**
   * Make an msync() call; swallow when unsupported.
   * @param path path
   * @throws IOException IO failure
   */
  protected final void msync(Path path) throws IOException {
    LOG.trace("{}: msync('{}')", getName(), path);
    trackDurationOfInvocation(getIOStatistics(), OP_MSYNC, () ->
        operations.msync(path));
  }
  /**
   * Create a directory -failing if it exists or if
   * mkdirs() failed.
   * @param operation operation for error reporting.
   * @param path path path to create.
   * @return the path.
   * @throws IOException failure
   * @throws PathIOException mkdirs failed.
   * @throws FileAlreadyExistsException destination exists.
   */
  protected final Path createNewDirectory(
      final String operation,
      final Path path) throws IOException {
    LOG.trace("{}: {} createNewDirectory('{}')", getName(), operation, path);
    requireNonNull(path,
        () -> String.format("%s: Null path for operation %s", getName(), operation));
    // check for dir existence before trying to create.
    try {
      final FileStatus status = getFileStatus(path);
      // no exception, so the path exists.
      throw new FileAlreadyExistsException(operation
          + ": path " + path
          + " already exists and has status " + status);
    } catch (FileNotFoundException e) {
      // the path does not exist, so create it.
      mkdirs(path, true);
      return path;
    }
  }
  /**
   * Assert that a path is a directory which must exist.
   * @param operation operation for error reporting.
   * @param path path path to create.
   * @return the path
   * @throws IOException failure
   * @throws PathIOException mkdirs failed.
   * @throws FileAlreadyExistsException destination exists.
   */
  protected final Path directoryMustExist(
      final String operation,
      final Path path) throws IOException {
    final FileStatus status = getFileStatus(path);
    if (!status.isDirectory()) {
      throw new PathIOException(path.toString(),
          operation
              + ": Path is not a directory; its status is :" + status);
    }
    return path;
  }
  /**
   * Save a task manifest or summary. This will be done by
   * writing to a temp path and then renaming.
   * If the destination path exists: Delete it.
   * @param manifestData the manifest/success file
   * @param tempPath temp path for the initial save
   * @param finalPath final path for rename.
   * @throws IOException failure to load/parse
   */
  @SuppressWarnings("unchecked")
  protected final <T extends AbstractManifestData> void save(T manifestData,
      final Path tempPath,
      final Path finalPath) throws IOException {
    LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath);
    trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
        operations.save(manifestData, tempPath, true));
    renameFile(tempPath, finalPath);
  }
  /**
   * Get an etag from a FileStatus which MUST BE
   * an implementation of EtagSource and
   * whose etag MUST NOT BE null/empty.
   * @param status the status; may be null.
   * @return the etag or null if not provided
   */
  public String getEtag(FileStatus status) {
    return operations.getEtag(status);
  }
  /**
   * Rename a file from source to dest; if the underlying FS API call
   * returned false that's escalated to an IOE.
   * @param source source file.
   * @param dest dest file
   * @throws IOException failure
   * @throws PathIOException if the rename() call returned false.
   */
  protected final void renameFile(final Path source, final Path dest)
      throws IOException {
    maybeDeleteDest(true, dest);
    executeRenamingOperation("renameFile", source, dest,
        OP_RENAME_FILE, () ->
            operations.renameFile(source, dest));
  }
  /**
   * Rename a file from source to dest; if the underlying FS API call
   * returned false that's escalated to an IOE.
   * @param source source file.
   * @param dest dest file
   * @throws IOException failure
   * @throws PathIOException if the rename() call returned false.
   */
  protected final void renameDir(final Path source, final Path dest)
      throws IOException {
    maybeDeleteDest(true, dest);
    executeRenamingOperation("renameDir", source, dest,
        OP_RENAME_FILE, () ->
        operations.renameDir(source, dest)
    );
  }
  /**
   * Commit a file from the manifest using rename or, if available, resilient renaming.
   * @param entry entry from manifest
   * @throws PathIOException if the rename() call returned false and was uprated.
   * @throws IOException failure
   */
  protected final CommitOutcome commitFile(FileEntry entry,
      boolean deleteDest)
      throws IOException {
    final Path source = entry.getSourcePath();
    final Path dest = entry.getDestPath();
    maybeDeleteDest(deleteDest, dest);
    if (storeSupportsResilientCommit()) {
      // get the commit permits
      final ManifestStoreOperations.CommitFileResult result = trackDuration(getIOStatistics(),
          OP_COMMIT_FILE_RENAME, () ->
              operations.commitFile(entry));
      if (result.recovered()) {
        // recovery took place.
        getIOStatistics().incrementCounter(OP_COMMIT_FILE_RENAME_RECOVERED);
      }
      if (result.getWaitTime() != null) {
        // note any delay which took place
        noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime());
      }
    } else {
      // commit with a simple rename; failures will be escalated.
      executeRenamingOperation("renameFile", source, dest,
          OP_COMMIT_FILE_RENAME, () ->
              operations.renameFile(source, dest));
    }
    return new CommitOutcome();
  }
  /**
   * Does this store support resilient commit.
   * @return true if resilient commit operations are available.
   */
  protected boolean storeSupportsResilientCommit() {
    return operations.storeSupportsResilientCommit();
  }
  private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException {
    if (deleteDest) {
      // delete the destination, always, knowing that it's a no-op if
      // the data isn't there. Skipping the change saves one round trip
      // to actually look for the file/object
      boolean deleted = delete(dest, true);
      // log the outcome in case of emergency diagnostics traces
      // being needed.
      LOG.debug("{}: delete('{}') returned {}'", getName(), dest, deleted);
    }
  }
  /**
   * Execute an operation to rename a file/dir, commit a manifest entry.
   * The statistic is tracked; returning false from the operation is considered
   * a failure from the statistics perspective.
   * @param operation operation name
   * @param source source path
   * @param dest dest path
   * @param statistic statistic to track
   * @param action callable of the operation
   * @throws IOException on any failure
   */
  private void executeRenamingOperation(String operation,
      Path source,
      Path dest,
      String statistic,
      CallableRaisingIOE<Boolean> action) throws IOException {
    LOG.debug("{}: {} '{}' to '{}')", getName(), operation, source, dest);
    requireNonNull(source, "Null source");
    requireNonNull(dest, "Null dest");
    // duration tracking is a bit convoluted as it
    // ensures that rename failures as well as IOEs are
    // treated as failures from a statistics perspective.
    DurationTracker tracker = createTracker(getIOStatistics(), statistic);
    boolean success;
    try {
      success = action.apply();
      if (!success) {
        // record failure in the tracker before closing it
        tracker.failed();
      }
    } catch (IOException | RuntimeException e) {
      LOG.info("{}: {} raised an exception: {}", getName(), operation, e.toString());
      LOG.debug("{}: {} stack trace", getName(), operation, e);
      tracker.failed();
      throw e;
    } finally {
      // success
      // update the tracker.
      tracker.close();
    }
    // escalate the failure; this is done out of the duration tracker
    // so its file status probes aren't included.
    if (!success) {
      throw escalateRenameFailure(operation, source, dest);
    }
  }
  /**
   * Escalate a rename failure to an exception.
   * Returns an error exception to throw if one was not
   * triggered when probing for the source.
   * @param operation operation name
   * @param source source path
   * @param dest dest path
   * @return an exception to throw
   * @throws IOException raised probing for source or dest
   */
  private PathIOException escalateRenameFailure(String operation,
      Path source, Path dest) throws IOException {
    // rename just returned false.
    // collect information for a meaningful error message
    // and include in an exception raised.
    // get the source status; this will implicitly raise a FNFE.
    final FileStatus sourceStatus = getFileStatus(source);
    // and look to see if there is anything at the destination
    final FileStatus destStatus = getFileStatusOrNull(dest);
    LOG.error("{}: failure to {} {} to {} with" +
            " source status {} " +
            " and destination status {}",
        getName(), operation, source, dest,
        sourceStatus, destStatus);
    return new PathIOException(source.toString(),
        FAILED_TO_RENAME_PREFIX + operation + " to " + dest);
  }
  /**
   * Outcome from the commit.
   */
  public static final class CommitOutcome {
  }
  /**
   * Job ID: never null.
   */
  protected final String getJobId() {
    return stageConfig.getJobId();
  }
  /**
   * Job attempt number.
   */
  protected final int getJobAttemptNumber() {
    return stageConfig.getJobAttemptNumber();
  }
  /**
   * ID of the task.
   */
  protected final String getTaskId() {
    return stageConfig.getTaskId();
  }
  /**
   * Get the task ID; raise an NPE
   * if it is null.
   * @return a non-null task ID.
   */
  protected final String getRequiredTaskId() {
    return requireNonNull(getTaskId(),
        "No Task ID in stage config");
  }
  /**
   * ID of this specific attempt at a task.
   */
  protected final String getTaskAttemptId() {
    return stageConfig.getTaskAttemptId();
  }
  /**
   * Get the task attempt ID; raise an NPE
   * if it is null.
   * @return a non-null task attempt ID.
   */
  protected final String getRequiredTaskAttemptId() {
    return requireNonNull(getTaskAttemptId(),
        "No Task Attempt ID in stage config");
  }
  /**
   * Job attempt dir.
   */
  protected final Path getJobAttemptDir() {
    return stageConfig.getJobAttemptDir();
  }
  /**
   * Directory to put task manifests into.
   * @return a path under the job attempt dir.
   */
  protected final Path getTaskManifestDir() {
    return stageConfig.getTaskManifestDir();
  }
  /**
   * Task attempt dir.
   */
  protected final Path getTaskAttemptDir() {
    return stageConfig.getTaskAttemptDir();
  }
  /**
   * Get the task attemptDir; raise an NPE
   * if it is null.
   * @return a non-null task attempt dir.
   */
  protected final Path getRequiredTaskAttemptDir() {
    return requireNonNull(getTaskAttemptDir(),
        "No Task Attempt Dir");
  }
  /**
   * Destination of job.
   */
  protected final Path getDestinationDir() {
    return stageConfig.getDestinationDir();
  }
  /**
   * Stage confog name, for logging.
   * @return name.
   */
  public final String getName() {
    return name;
  }
  /**
   * Submitter for doing IO against the store other than
   * manifest processing.
   */
  protected final TaskPool.Submitter getIOProcessors() {
    return ioProcessors;
  }
  /**
   * Submitter for doing IO against the store other than
   * manifest processing.
   * The size parameter is used to select between sequential
   * and parallel runners.
   * no data, or one entry: serial.
   * everything else, parallel.
   * @param size number of items.
   * @return a submitter or null
   */
  protected final TaskPool.Submitter getIOProcessors(int size) {
    return size > 1
        ? getIOProcessors()
        : null;
  }
  /**
   * Delete a directory, possibly suppressing exceptions.
   * @param dir directory.
   * @param suppressExceptions should exceptions be suppressed?
   * @throws IOException exceptions raised in delete if not suppressed.
   * @return any exception caught and suppressed
   */
  protected IOException deleteDir(
      final Path dir,
      final Boolean suppressExceptions)
      throws IOException {
    try {
      delete(dir, true);
      return null;
    } catch (IOException ex) {
      LOG.info("Error deleting {}: {}", dir, ex.toString());
      if (!suppressExceptions) {
        throw ex;
      } else {
        return ex;
      }
    }
  }
  /**
   * Create an entry for a file to rename under the destination.
   * If the store operations supports extracting etags from file status
   * entries, that is included in the entry
   * @param status source file
   * @param destDir destination directory
   * @return an entry which includes the rename path
   */
  protected FileEntry fileEntry(FileStatus status, Path destDir) {
    // generate a new path under the dest dir
    Path dest = new Path(destDir, status.getPath().getName());
    return new FileEntry(status.getPath(),
        dest,
        status.getLen(),
        getEtag(status));
  }
}
相关信息
相关文章
hadoop CreateOutputDirectoriesStage 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
 - 
                        3、 gate.io
 - 
                        7、 openharmony
 - 
                        9、 golang