hadoop AbstractJobOrTaskStage 源码

  • 2022-10-20
  • 浏览 (203)

haddop AbstractJobOrTaskStage 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.util.OperationDuration;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;

/**
 * A Stage in Task/Job Commit.
 * A stage can be executed once only, creating the return value of the
 * {@link #apply(Object)} method, and, potentially, updating the state of the
 * store via {@link ManifestStoreOperations}.
 * IOStatistics will also be updated.
 * Stages are expected to be combined to form the commit protocol.
 * @param <IN> Type of arguments to the stage.
 * @param <OUT> Type of result.
 */
public abstract class AbstractJobOrTaskStage<IN, OUT>
    implements JobOrTaskStage<IN, OUT> {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbstractJobOrTaskStage.class);

  /**
   * Error text on rename failure: {@value}.
   */
  public static final String FAILED_TO_RENAME_PREFIX = "Failed to ";

  /**
   * Is this a task stage? If so, toString() includes task
   * info..
   */
  private final boolean isTaskStage;

  /**
   * Configuration of all the stages in the ongoing committer
   * operation.
   */
  private final StageConfig stageConfig;

  /**
   * Name of the stage for statistics and logging.
   */
  private final String stageStatisticName;

  /**
   * Callbacks to update store.
   * This is not made visible to the stages; they must
   * go through the wrapper classes in this class, which
   * add statistics and logging.
   */
  private final ManifestStoreOperations operations;

  /**
   * Submitter for doing IO against the store.
   */
  private final TaskPool.Submitter ioProcessors;

  /**
   * Used to stop any re-entrancy of the rename.
   * This is an execute-once operation.
   */
  private final AtomicBoolean executed = new AtomicBoolean(false);

  /**
   * Tracker of the duration of the execution of the stage.
   * set after {@link #executeStage(Object)} completes.
   */
  private DurationTracker stageExecutionTracker;

  /**
   * Name for logging.
   */
  private final String name;

  /**
   * Constructor.
   * @param isTaskStage Is this a task stage?
   * @param stageConfig stage-independent configuration.
   * @param stageStatisticName name of the stage for statistics/logging
   * @param requireIOProcessors are the IO processors required?
   */
  protected AbstractJobOrTaskStage(
      final boolean isTaskStage,
      final StageConfig stageConfig,
      final String stageStatisticName,
      final boolean requireIOProcessors) {
    this.isTaskStage = isTaskStage;
    this.stageStatisticName = stageStatisticName;
    this.stageConfig = stageConfig;
    requireNonNull(stageConfig.getDestinationDir(), "Destination Directory");
    requireNonNull(stageConfig.getJobId(), "Job ID");
    requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory");
    this.operations = requireNonNull(stageConfig.getOperations(),
        "Operations callbacks");
    // and the processors of work if required.
    this.ioProcessors = bindProcessor(
        requireIOProcessors,
        stageConfig.getIoProcessors());
    String stageName;
    if (isTaskStage) {
      // force fast failure.
      getRequiredTaskId();
      getRequiredTaskAttemptId();
      getRequiredTaskAttemptDir();
      stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId());
    } else  {
      stageName = String.format("[Job-Attempt %s/%02d]",
          stageConfig.getJobId(),
          stageConfig.getJobAttemptNumber());
    }
    name = stageName;
  }

  /**
   * Bind to the processor if it is required.
   * @param required is the processor required?
   * @param processor processor
   * @return the processor binding
   * @throws NullPointerException if required == true and processor is null.
   */
  private TaskPool.Submitter bindProcessor(
      final boolean required,
      final TaskPool.Submitter processor) {
    return required
        ? requireNonNull(processor, "required IO processor is null")
        : null;
  }

  /**
   * Stage entry point.
   * Verifies that this is the first and only time the stage is invoked,
   * then calls {@link #executeStage(Object)} for the subclass
   * to perform its part of the commit protocol.
   * The duration of the stage is collected as a statistic, and its
   * entry/exit logged at INFO.
   * @param arguments arguments to the function.
   * @return the result.
   * @throws IOException failures.
   */
  @Override
  public final OUT apply(final IN arguments) throws IOException {
    executeOnlyOnce();
    progress();
    String stageName = getStageName(arguments);
    getStageConfig().enterStage(stageName);
    String statisticName = getStageStatisticName(arguments);
    // The tracker here
    LOG.info("{}: Executing Stage {}", getName(), stageName);
    stageExecutionTracker = createTracker(getIOStatistics(), statisticName);
    try {
      // exec the input function and return its value
      final OUT out = executeStage(arguments);
      LOG.info("{}: Stage {} completed after {}",
          getName(),
          stageName,
          OperationDuration.humanTime(
              stageExecutionTracker.asDuration().toMillis()));
      return out;
    } catch (IOException | RuntimeException e) {
      LOG.error("{}: Stage {} failed: after {}: {}",
          getName(),
          stageName,
          OperationDuration.humanTime(
              stageExecutionTracker.asDuration().toMillis()),
          e.toString());
      LOG.debug("{}: Stage failure:", getName(), e);
      // input function failed: note it
      stageExecutionTracker.failed();
      // and rethrow
      throw e;
    } finally {
      // update the tracker.
      // this is called after the catch() call would have
      // set the failed flag.
      stageExecutionTracker.close();
      progress();
      getStageConfig().exitStage(stageName);
    }
  }

  /**
   * The work of a stage.
   * Executed exactly once.
   * @param arguments arguments to the function.
   * @return the result.
   * @throws IOException failures.
   */
  protected abstract OUT executeStage(IN arguments) throws IOException;

  /**
   * Check that the operation has not been invoked twice.
   * This is an atomic check.
   * @throws IllegalStateException on a second invocation.
   */
  private void executeOnlyOnce() {
    Preconditions.checkState(
        !executed.getAndSet(true),
        "Stage attempted twice");
  }

  /**
   * The stage statistic name.
   * @param arguments args to the invocation.
   * @return stage name.
   */
  protected String getStageStatisticName(IN arguments) {
    return stageStatisticName;
  }

  /**
   * Stage name for reporting; defaults to
   * call {@link #getStageStatisticName(IN)}.
   * @param arguments args to the invocation.
   * @return name used in updating reports.
   */
  protected String getStageName(IN arguments) {
    return getStageStatisticName(arguments);
  }

  /**
   * Get the execution tracker; non-null
   * after stage execution.
   * @return a tracker or null.
   */
  public DurationTracker getStageExecutionTracker() {
    return stageExecutionTracker;
  }

  /**
   * Adds the duration of the job to an IOStatistics store
   * (such as the manifest to be saved).
   * @param iostats store
   * @param statistic statistic name.
   */
  public void addExecutionDurationToStatistics(IOStatisticsStore iostats,
      String statistic) {
    iostats.addTimedOperation(
        statistic,
        getStageExecutionTracker().asDuration());
  }

  /**
   * Note any rate limiting to the given timing statistic.
   * If the wait was 0, no statistics are updated.
   * @param statistic statistic key.
   * @param wait wait duration.
   */
  private void noteAnyRateLimiting(String statistic, Duration wait) {
    if (!wait.isZero()) {
      // rate limiting took place
      getIOStatistics().addTimedOperation(
          statistic,
          wait.toMillis());
    }
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "AbstractJobOrTaskStage{");
    sb.append(isTaskStage ? "Task Stage" : "Job Stage");
    sb.append(" name='").append(name).append('\'');
    sb.append(" stage='").append(stageStatisticName).append('\'');
    sb.append('}');
    return sb.toString();
  }

  /**
   * The stage configuration.
   * @return the stage configuration used by this stage.
   */
  protected StageConfig getStageConfig() {
    return stageConfig;
  }

  /**
   * Update the thread context with the stage name and
   * job ID.
   * This MUST be invoked at the start of methods invoked in helper threads,
   * to ensure that they are all annotated with job and stage.
   * @param stage stage name.
   */
  protected void updateAuditContext(final String stage) {
    enterStageWorker(stageConfig.getJobId(), stage);
  }

  /**
   * The IOStatistics are shared across all uses of the
   * StageConfig.
   * @return the (possibly shared) IOStatistics.
   */
  @Override
  public final IOStatisticsStore getIOStatistics() {
    return stageConfig.getIOStatistics();
  }

  /**
   * Call progress() on any Progressable passed in.
   */
  protected final void progress() {
    if (stageConfig.getProgressable() != null) {
      stageConfig.getProgressable().progress();
    }
  }

  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final FileStatus getFileStatusOrNull(
      final Path path)
      throws IOException {
    try {
      return getFileStatus(path);
    } catch (FileNotFoundException e) {
      return null;
    }
  }

  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final FileStatus getFileStatus(
      final Path path)
      throws IOException {
    LOG.trace("{}: getFileStatus('{}')", getName(), path);
    requireNonNull(path,
        () -> String.format("%s: Null path for getFileStatus() call", getName()));
    return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () ->
        operations.getFileStatus(path));
  }

  /**
   * Get a file status value or, if the path doesn't exist, return null.
   * @param path path
   * @return true if the path resolves to a file
   * @throws IOException IO Failure.
   */
  protected final boolean isFile(
      final Path path)
      throws IOException {
    LOG.trace("{}: isFile('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_IS_FILE, () -> {
      return operations.isFile(path);
    });
  }

  /**
   * Delete a path.
   * @param path path
   * @param recursive recursive delete.
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected final boolean delete(
      final Path path,
      final boolean recursive)
      throws IOException {
    LOG.trace("{}: delete('{}, {}')", getName(), path, recursive);
    return delete(path, recursive, OP_DELETE);
  }

  /**
   * Delete a path.
   * @param path path
   * @param recursive recursive delete.
   * @param statistic statistic to update
   * @return status or null
   * @throws IOException IO Failure.
   */
  protected Boolean delete(
      final Path path,
      final boolean recursive,
      final String statistic)
      throws IOException {
    return trackDuration(getIOStatistics(), statistic, () -> {
      return operations.delete(path, recursive);
    });
  }

  /**
   * Create a directory.
   * @param path path
   * @param escalateFailure escalate "false" to PathIOE
   * @return true if the directory was created/exists.
   * @throws IOException IO Failure.
   */
  protected final boolean mkdirs(
      final Path path,
      final boolean escalateFailure)
      throws IOException {
    LOG.trace("{}: mkdirs('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_MKDIRS, () -> {
      boolean success = operations.mkdirs(path);
      if (!success && escalateFailure) {
        throw new PathIOException(path.toUri().toString(),
            stageStatisticName + ": mkdirs() returned false");
      }
      return success;
    });

  }

  /**
   * List all directly files under a path.
   * Async implementations may under-report their durations.
   * @param path path
   * @return iterator over the results.
   * @throws IOException IO Failure.
   */
  protected final RemoteIterator<FileStatus> listStatusIterator(
      final Path path)
      throws IOException {
    LOG.trace("{}: listStatusIterator('{}')", getName(), path);
    return trackDuration(getIOStatistics(), OP_LIST_STATUS, () ->
        operations.listStatusIterator(path));
  }

  /**
   * Load a manifest file.
   * @param status source.
   * @return the manifest.
   * @throws IOException IO Failure.
   */
  protected final TaskManifest loadManifest(
      final FileStatus status)
      throws IOException {
    LOG.trace("{}: loadManifest('{}')", getName(), status);
    return trackDuration(getIOStatistics(), OP_LOAD_MANIFEST, () ->
        operations.loadTaskManifest(
            stageConfig.currentManifestSerializer(),
            status));
  }

  /**
   * List all the manifests in the task manifest dir.
   * @return a iterator of manifests.
   * @throws IOException IO Failure.
   */
  protected final RemoteIterator<FileStatus> listManifests()
      throws IOException {
    return RemoteIterators.filteringRemoteIterator(
        listStatusIterator(getTaskManifestDir()),
        st -> st.getPath().toUri().toString().endsWith(MANIFEST_SUFFIX));
  }

  /**
   * Make an msync() call; swallow when unsupported.
   * @param path path
   * @throws IOException IO failure
   */
  protected final void msync(Path path) throws IOException {
    LOG.trace("{}: msync('{}')", getName(), path);
    trackDurationOfInvocation(getIOStatistics(), OP_MSYNC, () ->
        operations.msync(path));
  }

  /**
   * Create a directory -failing if it exists or if
   * mkdirs() failed.
   * @param operation operation for error reporting.
   * @param path path path to create.
   * @return the path.
   * @throws IOException failure
   * @throws PathIOException mkdirs failed.
   * @throws FileAlreadyExistsException destination exists.
   */
  protected final Path createNewDirectory(
      final String operation,
      final Path path) throws IOException {
    LOG.trace("{}: {} createNewDirectory('{}')", getName(), operation, path);
    requireNonNull(path,
        () -> String.format("%s: Null path for operation %s", getName(), operation));
    // check for dir existence before trying to create.
    try {
      final FileStatus status = getFileStatus(path);
      // no exception, so the path exists.
      throw new FileAlreadyExistsException(operation
          + ": path " + path
          + " already exists and has status " + status);
    } catch (FileNotFoundException e) {
      // the path does not exist, so create it.
      mkdirs(path, true);
      return path;
    }
  }

  /**
   * Assert that a path is a directory which must exist.
   * @param operation operation for error reporting.
   * @param path path path to create.
   * @return the path
   * @throws IOException failure
   * @throws PathIOException mkdirs failed.
   * @throws FileAlreadyExistsException destination exists.
   */
  protected final Path directoryMustExist(
      final String operation,
      final Path path) throws IOException {
    final FileStatus status = getFileStatus(path);
    if (!status.isDirectory()) {
      throw new PathIOException(path.toString(),
          operation
              + ": Path is not a directory; its status is :" + status);
    }
    return path;
  }

  /**
   * Save a task manifest or summary. This will be done by
   * writing to a temp path and then renaming.
   * If the destination path exists: Delete it.
   * @param manifestData the manifest/success file
   * @param tempPath temp path for the initial save
   * @param finalPath final path for rename.
   * @throws IOException failure to load/parse
   */
  @SuppressWarnings("unchecked")
  protected final <T extends AbstractManifestData> void save(T manifestData,
      final Path tempPath,
      final Path finalPath) throws IOException {
    LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath);
    trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
        operations.save(manifestData, tempPath, true));
    renameFile(tempPath, finalPath);
  }

  /**
   * Get an etag from a FileStatus which MUST BE
   * an implementation of EtagSource and
   * whose etag MUST NOT BE null/empty.
   * @param status the status; may be null.
   * @return the etag or null if not provided
   */
  public String getEtag(FileStatus status) {
    return operations.getEtag(status);
  }

  /**
   * Rename a file from source to dest; if the underlying FS API call
   * returned false that's escalated to an IOE.
   * @param source source file.
   * @param dest dest file
   * @throws IOException failure
   * @throws PathIOException if the rename() call returned false.
   */
  protected final void renameFile(final Path source, final Path dest)
      throws IOException {
    maybeDeleteDest(true, dest);
    executeRenamingOperation("renameFile", source, dest,
        OP_RENAME_FILE, () ->
            operations.renameFile(source, dest));
  }

  /**
   * Rename a file from source to dest; if the underlying FS API call
   * returned false that's escalated to an IOE.
   * @param source source file.
   * @param dest dest file
   * @throws IOException failure
   * @throws PathIOException if the rename() call returned false.
   */
  protected final void renameDir(final Path source, final Path dest)
      throws IOException {

    maybeDeleteDest(true, dest);
    executeRenamingOperation("renameDir", source, dest,
        OP_RENAME_FILE, () ->
        operations.renameDir(source, dest)
    );
  }

  /**
   * Commit a file from the manifest using rename or, if available, resilient renaming.
   * @param entry entry from manifest
   * @throws PathIOException if the rename() call returned false and was uprated.
   * @throws IOException failure
   */
  protected final CommitOutcome commitFile(FileEntry entry,
      boolean deleteDest)
      throws IOException {

    final Path source = entry.getSourcePath();
    final Path dest = entry.getDestPath();

    maybeDeleteDest(deleteDest, dest);
    if (storeSupportsResilientCommit()) {
      // get the commit permits
      final ManifestStoreOperations.CommitFileResult result = trackDuration(getIOStatistics(),
          OP_COMMIT_FILE_RENAME, () ->
              operations.commitFile(entry));
      if (result.recovered()) {
        // recovery took place.
        getIOStatistics().incrementCounter(OP_COMMIT_FILE_RENAME_RECOVERED);
      }
      if (result.getWaitTime() != null) {
        // note any delay which took place
        noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime());
      }
    } else {
      // commit with a simple rename; failures will be escalated.
      executeRenamingOperation("renameFile", source, dest,
          OP_COMMIT_FILE_RENAME, () ->
              operations.renameFile(source, dest));
    }
    return new CommitOutcome();
  }

  /**
   * Does this store support resilient commit.
   * @return true if resilient commit operations are available.
   */
  protected boolean storeSupportsResilientCommit() {
    return operations.storeSupportsResilientCommit();
  }

  private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException {
    if (deleteDest) {
      // delete the destination, always, knowing that it's a no-op if
      // the data isn't there. Skipping the change saves one round trip
      // to actually look for the file/object
      boolean deleted = delete(dest, true);
      // log the outcome in case of emergency diagnostics traces
      // being needed.
      LOG.debug("{}: delete('{}') returned {}'", getName(), dest, deleted);
    }
  }

  /**
   * Execute an operation to rename a file/dir, commit a manifest entry.
   * The statistic is tracked; returning false from the operation is considered
   * a failure from the statistics perspective.
   * @param operation operation name
   * @param source source path
   * @param dest dest path
   * @param statistic statistic to track
   * @param action callable of the operation
   * @throws IOException on any failure
   */
  private void executeRenamingOperation(String operation,
      Path source,
      Path dest,
      String statistic,
      CallableRaisingIOE<Boolean> action) throws IOException {

    LOG.debug("{}: {} '{}' to '{}')", getName(), operation, source, dest);
    requireNonNull(source, "Null source");
    requireNonNull(dest, "Null dest");

    // duration tracking is a bit convoluted as it
    // ensures that rename failures as well as IOEs are
    // treated as failures from a statistics perspective.

    DurationTracker tracker = createTracker(getIOStatistics(), statistic);
    boolean success;
    try {
      success = action.apply();
      if (!success) {
        // record failure in the tracker before closing it
        tracker.failed();
      }
    } catch (IOException | RuntimeException e) {
      LOG.info("{}: {} raised an exception: {}", getName(), operation, e.toString());
      LOG.debug("{}: {} stack trace", getName(), operation, e);
      tracker.failed();
      throw e;
    } finally {
      // success
      // update the tracker.
      tracker.close();
    }
    // escalate the failure; this is done out of the duration tracker
    // so its file status probes aren't included.
    if (!success) {
      throw escalateRenameFailure(operation, source, dest);
    }
  }

  /**
   * Escalate a rename failure to an exception.
   * Returns an error exception to throw if one was not
   * triggered when probing for the source.
   * @param operation operation name
   * @param source source path
   * @param dest dest path
   * @return an exception to throw
   * @throws IOException raised probing for source or dest
   */
  private PathIOException escalateRenameFailure(String operation,
      Path source, Path dest) throws IOException {
    // rename just returned false.
    // collect information for a meaningful error message
    // and include in an exception raised.

    // get the source status; this will implicitly raise a FNFE.
    final FileStatus sourceStatus = getFileStatus(source);

    // and look to see if there is anything at the destination
    final FileStatus destStatus = getFileStatusOrNull(dest);

    LOG.error("{}: failure to {} {} to {} with" +
            " source status {} " +
            " and destination status {}",
        getName(), operation, source, dest,
        sourceStatus, destStatus);

    return new PathIOException(source.toString(),
        FAILED_TO_RENAME_PREFIX + operation + " to " + dest);
  }

  /**
   * Outcome from the commit.
   */
  public static final class CommitOutcome {

  }

  /**
   * Job ID: never null.
   */
  protected final String getJobId() {
    return stageConfig.getJobId();
  }

  /**
   * Job attempt number.
   */
  protected final int getJobAttemptNumber() {
    return stageConfig.getJobAttemptNumber();
  }

  /**
   * ID of the task.
   */
  protected final String getTaskId() {
    return stageConfig.getTaskId();
  }

  /**
   * Get the task ID; raise an NPE
   * if it is null.
   * @return a non-null task ID.
   */
  protected final String getRequiredTaskId() {
    return requireNonNull(getTaskId(),
        "No Task ID in stage config");
  }

  /**
   * ID of this specific attempt at a task.
   */
  protected final String getTaskAttemptId() {
    return stageConfig.getTaskAttemptId();
  }

  /**
   * Get the task attempt ID; raise an NPE
   * if it is null.
   * @return a non-null task attempt ID.
   */
  protected final String getRequiredTaskAttemptId() {
    return requireNonNull(getTaskAttemptId(),
        "No Task Attempt ID in stage config");
  }

  /**
   * Job attempt dir.
   */
  protected final Path getJobAttemptDir() {
    return stageConfig.getJobAttemptDir();
  }

  /**
   * Directory to put task manifests into.
   * @return a path under the job attempt dir.
   */
  protected final Path getTaskManifestDir() {
    return stageConfig.getTaskManifestDir();
  }


  /**
   * Task attempt dir.
   */
  protected final Path getTaskAttemptDir() {
    return stageConfig.getTaskAttemptDir();
  }

  /**
   * Get the task attemptDir; raise an NPE
   * if it is null.
   * @return a non-null task attempt dir.
   */
  protected final Path getRequiredTaskAttemptDir() {
    return requireNonNull(getTaskAttemptDir(),
        "No Task Attempt Dir");
  }

  /**
   * Destination of job.
   */
  protected final Path getDestinationDir() {
    return stageConfig.getDestinationDir();
  }

  /**
   * Stage confog name, for logging.
   * @return name.
   */
  public final String getName() {
    return name;
  }

  /**
   * Submitter for doing IO against the store other than
   * manifest processing.
   */
  protected final TaskPool.Submitter getIOProcessors() {
    return ioProcessors;
  }

  /**
   * Submitter for doing IO against the store other than
   * manifest processing.
   * The size parameter is used to select between sequential
   * and parallel runners.
   * no data, or one entry: serial.
   * everything else, parallel.
   * @param size number of items.
   * @return a submitter or null
   */
  protected final TaskPool.Submitter getIOProcessors(int size) {
    return size > 1
        ? getIOProcessors()
        : null;
  }

  /**
   * Delete a directory, possibly suppressing exceptions.
   * @param dir directory.
   * @param suppressExceptions should exceptions be suppressed?
   * @throws IOException exceptions raised in delete if not suppressed.
   * @return any exception caught and suppressed
   */
  protected IOException deleteDir(
      final Path dir,
      final Boolean suppressExceptions)
      throws IOException {
    try {
      delete(dir, true);
      return null;
    } catch (IOException ex) {
      LOG.info("Error deleting {}: {}", dir, ex.toString());
      if (!suppressExceptions) {
        throw ex;
      } else {
        return ex;
      }
    }
  }

  /**
   * Create an entry for a file to rename under the destination.
   * If the store operations supports extracting etags from file status
   * entries, that is included in the entry
   * @param status source file
   * @param destDir destination directory
   * @return an entry which includes the rename path
   */
  protected FileEntry fileEntry(FileStatus status, Path destDir) {
    // generate a new path under the dest dir
    Path dest = new Path(destDir, status.getPath().getName());
    return new FileEntry(status.getPath(),
        dest,
        status.getLen(),
        getEtag(status));
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbortTaskStage 源码

hadoop CleanupJobStage 源码

hadoop CommitJobStage 源码

hadoop CommitTaskStage 源码

hadoop CreateOutputDirectoriesStage 源码

hadoop JobOrTaskStage 源码

hadoop LoadManifestsStage 源码

hadoop RenameFilesStage 源码

hadoop SaveSuccessFileStage 源码

hadoop SaveTaskManifestStage 源码

0  赞