hadoop MultithreadedMapRunner 源码

  • 2022-10-20
  • 浏览 (171)

haddop MultithreadedMapRunner 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.lib;

import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.*;

/**
 * Multithreaded implementation for {@link MapRunnable}.
 * <p>
 * It can be used instead of the default implementation,
 * of {@link org.apache.hadoop.mapred.MapRunner}, when the Map
 * operation is not CPU bound in order to improve throughput.
 * <p>
 * Map implementations using this MapRunnable must be thread-safe.
 * <p>
 * The Map-Reduce job has to be configured to use this MapRunnable class (using
 * the JobConf.setMapRunnerClass method) and
 * the number of threads the thread-pool can use with the
 * <code>mapred.map.multithreadedrunner.threads</code> property, its default
 * value is 10 threads.
 * <p>
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultithreadedMapRunner<K1, V1, K2, V2>
    implements MapRunnable<K1, V1, K2, V2> {

  private static final Logger LOG =
      LoggerFactory.getLogger(MultithreadedMapRunner.class.getName());

  private JobConf job;
  private Mapper<K1, V1, K2, V2> mapper;
  private ExecutorService executorService;
  private volatile IOException ioException;
  private volatile RuntimeException runtimeException;
  private boolean incrProcCount;

  @SuppressWarnings("unchecked")
  public void configure(JobConf jobConf) {
    int numberOfThreads =
      jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
                " to use " + numberOfThreads + " threads");
    }

    this.job = jobConf;
    //increment processed counter only if skipping feature is enabled
    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
      SkipBadRecords.getAutoIncrMapperProcCount(job);
    this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
        jobConf);

    // Creating a threadpool of the configured size to execute the Mapper
    // map method in parallel.
    executorService = new HadoopThreadPoolExecutor(numberOfThreads,
        numberOfThreads,
                                             0L, TimeUnit.MILLISECONDS,
                                             new BlockingArrayQueue
                                               (numberOfThreads));
  }

  /**
   * A blocking array queue that replaces offer and add, which throws on a full
   * queue, to a put, which waits on a full queue.
   */
  private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
 
    private static final long serialVersionUID = 1L;
    public BlockingArrayQueue(int capacity) {
      super(capacity);
    }
    public boolean offer(Runnable r) {
      return add(r);
    }
    public boolean add(Runnable r) {
      try {
        put(r);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
      }
      return true;
    }
  }

  private void checkForExceptionsFromProcessingThreads()
      throws IOException, RuntimeException {
    // Checking if a Mapper.map within a Runnable has generated an
    // IOException. If so we rethrow it to force an abort of the Map
    // operation thus keeping the semantics of the default
    // implementation.
    if (ioException != null) {
      throw ioException;
    }

    // Checking if a Mapper.map within a Runnable has generated a
    // RuntimeException. If so we rethrow it to force an abort of the Map
    // operation thus keeping the semantics of the default
    // implementation.
    if (runtimeException != null) {
      throw runtimeException;
    }
  }

  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                  Reporter reporter)
    throws IOException {
    try {
      // allocate key & value instances these objects will not be reused
      // because execution of Mapper.map is not serialized.
      K1 key = input.createKey();
      V1 value = input.createValue();

      while (input.next(key, value)) {

        executorService.execute(new MapperInvokeRunable(key, value, output,
                                reporter));

        checkForExceptionsFromProcessingThreads();

        // Allocate new key & value instances as mapper is running in parallel
        key = input.createKey();
        value = input.createValue();
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("Finished dispatching all Mappper.map calls, job "
                  + job.getJobName());
      }

      // Graceful shutdown of the Threadpool, it will let all scheduled
      // Runnables to end.
      executorService.shutdown();

      try {

        // Now waiting for all Runnables to end.
        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
                      + job.getJobName());
          }

          // NOTE: while Mapper.map dispatching has concluded there are still
          // map calls in progress and exceptions would be thrown.
          checkForExceptionsFromProcessingThreads();

        }

        // NOTE: it could be that a map call has had an exception after the
        // call for awaitTermination() returing true. And edge case but it
        // could happen.
        checkForExceptionsFromProcessingThreads();

      } catch (IOException ioEx) {
        // Forcing a shutdown of all thread of the threadpool and rethrowing
        // the IOException
        executorService.shutdownNow();
        throw ioEx;
      } catch (InterruptedException iEx) {
        throw new RuntimeException(iEx);
      }

    } finally {
      mapper.close();
    }
  }


  /**
   * Runnable to execute a single Mapper.map call from a forked thread.
   */
  private class MapperInvokeRunable implements Runnable {
    private K1 key;
    private V1 value;
    private OutputCollector<K2, V2> output;
    private Reporter reporter;

    /**
     * Collecting all required parameters to execute a Mapper.map call.
     * <p>
     *
     * @param key
     * @param value
     * @param output
     * @param reporter
     */
    public MapperInvokeRunable(K1 key, V1 value,
                               OutputCollector<K2, V2> output,
                               Reporter reporter) {
      this.key = key;
      this.value = value;
      this.output = output;
      this.reporter = reporter;
    }

    /**
     * Executes a Mapper.map call with the given Mapper and parameters.
     * <p>
     * This method is called from the thread-pool thread.
     *
     */
    public void run() {
      try {
        // map pair to output
        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
        }
      } catch (IOException ex) {
        // If there is an IOException during the call it is set in an instance
        // variable of the MultithreadedMapRunner from where it will be
        // rethrown.
        synchronized (MultithreadedMapRunner.this) {
          if (MultithreadedMapRunner.this.ioException == null) {
            MultithreadedMapRunner.this.ioException = ex;
          }
        }
      } catch (RuntimeException ex) {
        // If there is a RuntimeException during the call it is set in an
        // instance variable of the MultithreadedMapRunner from where it will be
        // rethrown.
        synchronized (MultithreadedMapRunner.this) {
          if (MultithreadedMapRunner.this.runtimeException == null) {
            MultithreadedMapRunner.this.runtimeException = ex;
          }
        }
      }
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop BinaryPartitioner 源码

hadoop Chain 源码

hadoop ChainMapper 源码

hadoop ChainReducer 源码

hadoop CombineFileInputFormat 源码

hadoop CombineFileRecordReader 源码

hadoop CombineFileRecordReaderWrapper 源码

hadoop CombineFileSplit 源码

hadoop CombineSequenceFileInputFormat 源码

hadoop CombineTextInputFormat 源码

0  赞