hadoop TaskPool 源码

  • 2022-10-20
  • 浏览 (213)

haddop TaskPool 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;

/**
 * Utility class for parallel execution, takes closures for the various
 * actions.
 * There is no retry logic: it is expected to be handled by the closures.
 * From {@code org.apache.hadoop.fs.s3a.commit.Tasks} which came from
 * the Netflix committer patch.
 * Apache Iceberg has its own version of this, with a common ancestor
 * at some point in its history.
 * A key difference with this class is that the iterator is always,
 * internally, an {@link RemoteIterator}.
 * This is to allow tasks to be scheduled while incremental operations
 * such as paged directory listings are still collecting in results.
 *
 * While awaiting completion, this thread spins and sleeps a time of
 * {@link #SLEEP_INTERVAL_AWAITING_COMPLETION}, which, being a
 * busy-wait, is inefficient.
 * There's an implicit assumption that remote IO is being performed, and
 * so this is not impacting throughput/performance.
 *
 * History:
 * This class came with the Netflix contributions to the S3A committers
 * in HADOOP-13786.
 * It was moved into hadoop-common for use in the manifest committer and
 * anywhere else it is needed, and renamed in the process as
 * "Tasks" has too many meanings in the hadoop source.
 * The iterator was then changed from a normal java iterable
 * to a hadoop {@link org.apache.hadoop.fs.RemoteIterator}.
 * This allows a task pool to be supplied with incremental listings
 * from object stores, scheduling work as pages of listing
 * results come in, rather than blocking until the entire
 * directory/directory tree etc has been enumerated.
 *
 * There is a variant of this in Apache Iceberg in
 * {@code org.apache.iceberg.util.Tasks}
 * That is not derived from any version in the hadoop codebase, it
 * just shares a common ancestor somewhere in the Netflix codebase.
 * It is the more sophisticated version.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class TaskPool {
  private static final Logger LOG =
      LoggerFactory.getLogger(TaskPool.class);

  /**
   * Interval in milliseconds to await completion.
   */
  private static final int SLEEP_INTERVAL_AWAITING_COMPLETION = 10;

  private TaskPool() {
  }

  /**
   * Callback invoked to process an item.
   * @param <I> item type being processed
   * @param <E> exception class which may be raised
   */
  @FunctionalInterface
  public interface Task<I, E extends Exception> {
    void run(I item) throws E;
  }

  /**
   * Callback invoked on a failure.
   * @param <I> item type being processed
   * @param <E> exception class which may be raised
   */
  @FunctionalInterface
  public interface FailureTask<I, E extends Exception> {

    /**
     * process a failure.
     * @param item item the task is processing
     * @param exception the exception which was raised.
     * @throws E Exception of type E
     */
    void run(I item, Exception exception) throws E;
  }

  /**
   * Builder for task execution.
   * @param <I> item type
   */
  public static class Builder<I> {
    private final RemoteIterator<I> items;
    private Submitter service = null;
    private FailureTask<I, ?> onFailure = null;
    private boolean stopOnFailure = false;
    private boolean suppressExceptions = false;
    private Task<I, ?> revertTask = null;
    private boolean stopRevertsOnFailure = false;
    private Task<I, ?> abortTask = null;
    private boolean stopAbortsOnFailure = false;
    private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;

    /**
     * IOStatisticsContext to switch to in all threads
     * taking part in the commit operation.
     * This ensures that the IOStatistics collected in the
     * worker threads will be aggregated into the total statistics
     * of the thread calling the committer commit/abort methods.
     */
    private IOStatisticsContext ioStatisticsContext = null;

    /**
     * Create the builder.
     * @param items items to process
     */
    Builder(RemoteIterator<I> items) {
      this.items = requireNonNull(items, "items");
    }

    /**
     * Create the builder.
     * @param items items to process
     */
    Builder(Iterable<I> items) {
      this(remoteIteratorFromIterable(items));
    }

    /**
     * Declare executor service: if null, the tasks are executed in a single
     * thread.
     * @param submitter service to schedule tasks with.
     * @return this builder.
     */
    public Builder<I> executeWith(@Nullable Submitter submitter) {

      this.service = submitter;
      return this;
    }

    /**
     * Task to invoke on failure.
     * @param task task
     * @return the builder
     */
    public Builder<I> onFailure(FailureTask<I, ?> task) {
      this.onFailure = task;
      return this;
    }

    public Builder<I> stopOnFailure() {
      this.stopOnFailure = true;
      return this;
    }

    /**
     * Suppress exceptions from tasks.
     * RemoteIterator exceptions are not suppressable.
     * @return the builder.
     */
    public Builder<I> suppressExceptions() {
      return suppressExceptions(true);
    }

    /**
     * Suppress exceptions from tasks.
     * RemoteIterator exceptions are not suppressable.
     * @param suppress new value
     * @return the builder.
     */
    public Builder<I> suppressExceptions(boolean suppress) {
      this.suppressExceptions = suppress;
      return this;
    }

    /**
     * Task to revert with after another task failed.
     * @param task task to execute
     * @return the builder
     */
    public Builder<I> revertWith(Task<I, ?> task) {
      this.revertTask = task;
      return this;
    }

    /**
     * Stop trying to revert if one operation fails.
     * @return the builder
     */
    public Builder<I> stopRevertsOnFailure() {
      this.stopRevertsOnFailure = true;
      return this;
    }

    /**
     * Task to abort with after another task failed.
     * @param task task to execute
     * @return the builder
     */
    public Builder<I> abortWith(Task<I, ?> task) {
      this.abortTask = task;
      return this;
    }

    /**
     * Stop trying to abort if one operation fails.
     * @return the builder
     */
    public Builder<I> stopAbortsOnFailure() {
      this.stopAbortsOnFailure = true;
      return this;
    }

    /**
     * Set the sleep interval.
     * @param value new value
     * @return the builder
     */
    public Builder<I> sleepInterval(final int value) {
      sleepInterval = value;
      return this;
    }

    /**
     * Execute the task across the data.
     * @param task task to execute
     * @param <E> exception which may be raised in execution.
     * @return true if the operation executed successfully
     * @throws E any exception raised.
     * @throws IOException IOExceptions raised by remote iterator or in execution.
     */
    public <E extends Exception> boolean run(Task<I, E> task) throws E, IOException {
      requireNonNull(items, "items");
      if (!items.hasNext()) {
        // if there are no items, return without worrying about
        // execution pools, errors etc.
        return true;
      }
      if (service != null) {
        // thread pool, so run in parallel
        return runParallel(task);
      } else {
        // single threaded execution.
        return runSingleThreaded(task);
      }
    }

    /**
     * Single threaded execution.
     * @param task task to execute
     * @param <E> exception which may be raised in execution.
     * @return true if the operation executed successfully
     * @throws E any exception raised.
     * @throws IOException IOExceptions raised by remote iterator or in execution.
     */
    private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
        throws E, IOException {
      List<I> succeeded = new ArrayList<>();
      List<Exception> exceptions = new ArrayList<>();

      RemoteIterator<I> iterator = items;
      boolean threw = true;
      try {
        while (iterator.hasNext()) {
          I item = iterator.next();
          try {
            task.run(item);
            succeeded.add(item);

          } catch (Exception e) {
            exceptions.add(e);

            if (onFailure != null) {
              try {
                onFailure.run(item, e);
              } catch (Exception failException) {
                LOG.error("Failed to clean up on failure", e);
                // keep going
              }
            }

            if (stopOnFailure) {
              break;
            }
          }
        }

        threw = false;
      } catch (IOException iteratorIOE) {
        // an IOE is reaised here during iteration
        LOG.debug("IOException when iterating through {}", iterator, iteratorIOE);
        throw iteratorIOE;
      } finally {
        // threw handles exceptions that were *not* caught by the catch block,
        // and exceptions that were caught and possibly handled by onFailure
        // are kept in exceptions.
        if (threw || !exceptions.isEmpty()) {
          if (revertTask != null) {
            boolean failed = false;
            for (I item : succeeded) {
              try {
                revertTask.run(item);
              } catch (Exception e) {
                LOG.error("Failed to revert task", e);
                failed = true;
                // keep going
              }
              if (stopRevertsOnFailure && failed) {
                break;
              }
            }
          }

          if (abortTask != null) {
            boolean failed = false;
            while (iterator.hasNext()) {
              try {
                abortTask.run(iterator.next());
              } catch (Exception e) {
                failed = true;
                LOG.error("Failed to abort task", e);
                // keep going
              }
              if (stopAbortsOnFailure && failed) {
                break;
              }
            }
          }
        }
      }

      if (!suppressExceptions && !exceptions.isEmpty()) {
        TaskPool.<E>throwOne(exceptions);
      }

      return exceptions.isEmpty();
    }

    /**
     * Parallel execution.
     * All tasks run within the same IOStatisticsContext as the
     * thread calling this method.
     * @param task task to execute
     * @param <E> exception which may be raised in execution.
     * @return true if the operation executed successfully
     * @throws E any exception raised.
     * @throws IOException IOExceptions raised by remote iterator or in execution.
     */
    private <E extends Exception> boolean runParallel(final Task<I, E> task)
        throws E, IOException {
      final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
      final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
      final AtomicBoolean taskFailed = new AtomicBoolean(false);
      final AtomicBoolean abortFailed = new AtomicBoolean(false);
      final AtomicBoolean revertFailed = new AtomicBoolean(false);

      List<Future<?>> futures = new ArrayList<>();
      ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext();

      IOException iteratorIOE = null;
      final RemoteIterator<I> iterator = this.items;
      try {
        while (iterator.hasNext()) {
          final I item = iterator.next();
          // submit a task for each item that will either run or abort the task
          futures.add(service.submit(() -> {
            setStatisticsContext();
            try {
              if (!(stopOnFailure && taskFailed.get())) {
                // prepare and run the task
                boolean threw = true;
                try {
                  LOG.debug("Executing task");
                  task.run(item);
                  succeeded.add(item);
                  LOG.debug("Task succeeded");

                  threw = false;

                } catch (Exception e) {
                  taskFailed.set(true);
                  exceptions.add(e);
                  LOG.info("Task failed {}", e.toString());
                  LOG.debug("Task failed", e);

                  if (onFailure != null) {
                    try {
                      onFailure.run(item, e);
                    } catch (Exception failException) {
                      LOG.warn("Failed to clean up on failure", e);
                      // swallow the exception
                    }
                  }
                } finally {
                  if (threw) {
                    taskFailed.set(true);
                  }
                }

              } else if (abortTask != null) {
                // abort the task instead of running it
                if (stopAbortsOnFailure && abortFailed.get()) {
                  return;
                }

                boolean failed = true;
                try {
                  LOG.info("Aborting task");
                  abortTask.run(item);
                  failed = false;
                } catch (Exception e) {
                  LOG.error("Failed to abort task", e);
                  // swallow the exception
                } finally {
                  if (failed) {
                    abortFailed.set(true);
                  }
                }
              }
            } finally {
              resetStatisticsContext();
            }
          }));
        }
      } catch (IOException e) {
        // iterator failure.
        LOG.debug("IOException when iterating through {}", iterator, e);
        iteratorIOE = e;
        // mark as a task failure so all submitted tasks will halt/abort
        taskFailed.set(true);
      }
      // let the above tasks complete (or abort)
      waitFor(futures, sleepInterval);
      int futureCount = futures.size();
      futures.clear();

      if (taskFailed.get() && revertTask != null) {
        // at least one task failed, revert any that succeeded
        LOG.info("Reverting all {} succeeded tasks from {} futures",
            succeeded.size(), futureCount);
        for (final I item : succeeded) {
          futures.add(service.submit(() -> {
            if (stopRevertsOnFailure && revertFailed.get()) {
              return;
            }

            boolean failed = true;
            setStatisticsContext();
            try {
              revertTask.run(item);
              failed = false;
            } catch (Exception e) {
              LOG.error("Failed to revert task", e);
              // swallow the exception
            } finally {
              if (failed) {
                revertFailed.set(true);
              }
              resetStatisticsContext();
            }
          }));
        }

        // let the revert tasks complete
        waitFor(futures, sleepInterval);
      }

      // give priority to execution exceptions over
      // iterator exceptions.
      if (!suppressExceptions && !exceptions.isEmpty()) {
        // there's an exception list to build up, cast and throw.
        TaskPool.<E>throwOne(exceptions);
      }

      // raise any iterator exception.
      // this can not be suppressed.
      if (iteratorIOE != null) {
        throw iteratorIOE;
      }

      // return true if all tasks succeeded.
      return !taskFailed.get();
    }

    /**
     * Set the statistics context for this thread.
     */
    private void setStatisticsContext() {
      if (ioStatisticsContext != null) {
        IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
      }
    }

    /**
     * Reset the statistics context if it was set earlier.
     * This unbinds the current thread from any statistics
     * context.
     */
    private void resetStatisticsContext() {
      if (ioStatisticsContext != null) {
        IOStatisticsContext.setThreadIOStatisticsContext(null);
      }
    }
  }

  /**
   * Wait for all the futures to complete; there's a small sleep between
   * each iteration; enough to yield the CPU.
   * @param futures futures.
   * @param sleepInterval Interval in milliseconds to await completion.
   */
  private static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
    int size = futures.size();
    LOG.debug("Waiting for {} tasks to complete", size);
    int oldNumFinished = 0;
    while (true) {
      int numFinished = (int) futures.stream().filter(Future::isDone).count();

      if (oldNumFinished != numFinished) {
        LOG.debug("Finished count -> {}/{}", numFinished, size);
        oldNumFinished = numFinished;
      }

      if (numFinished == size) {
        // all of the futures are done, stop looping
        break;
      } else {
        try {
          Thread.sleep(sleepInterval);
        } catch (InterruptedException e) {
          futures.forEach(future -> future.cancel(true));
          Thread.currentThread().interrupt();
          break;
        }
      }
    }
  }

  /**
   * Create a task builder for the iterable.
   * @param items item source.
   * @param <I> type of result.
   * @return builder.
   */
  public static <I> Builder<I> foreach(Iterable<I> items) {
    return new Builder<>(requireNonNull(items, "items"));
  }

  /**
   * Create a task builder for the remote iterator.
   * @param items item source.
   * @param <I> type of result.
   * @return builder.
   */
  public static <I> Builder<I> foreach(RemoteIterator<I> items) {
    return new Builder<>(items);
  }

  public static <I> Builder<I> foreach(I[] items) {
    return new Builder<>(Arrays.asList(requireNonNull(items, "items")));
  }

  /**
   * Throw one exception, adding the others as suppressed
   * exceptions attached to the one thrown.
   * This method never completes normally.
   * @param exceptions collection of exceptions
   * @param <E> class of exceptions
   * @throws E an extracted exception.
   */
  private static <E extends Exception> void throwOne(
      Collection<Exception> exceptions)
      throws E {
    Iterator<Exception> iter = exceptions.iterator();
    Exception e = iter.next();
    Class<? extends Exception> exceptionClass = e.getClass();

    while (iter.hasNext()) {
      Exception other = iter.next();
      if (!exceptionClass.isInstance(other)) {
        e.addSuppressed(other);
      }
    }

    TaskPool.<E>castAndThrow(e);
  }

  /**
   * Raise an exception of the declared type.
   * This method never completes normally.
   * @param e exception
   * @param <E> class of exceptions
   * @throws E a recast exception.
   */
  @SuppressWarnings("unchecked")
  private static <E extends Exception> void castAndThrow(Exception e) throws E {
    if (e instanceof RuntimeException) {
      throw (RuntimeException) e;
    }
    throw (E) e;
  }

  /**
   * Interface to whatever lets us submit tasks.
   */
  public interface Submitter {

    /**
     * Submit work.
     * @param task task to execute
     * @return the future of the submitted task.
     */
    Future<?> submit(Runnable task);
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop BiFunctionRaisingIOE 源码

hadoop CallableRaisingIOE 源码

hadoop CloseableTaskPoolSubmitter 源码

hadoop CommonCallableSupplier 源码

hadoop ConsumerRaisingIOE 源码

hadoop Function4RaisingIOE 源码

hadoop FunctionRaisingIOE 源码

hadoop FutureIO 源码

hadoop InvocationRaisingIOE 源码

hadoop RemoteIterators 源码

0  赞