hadoop NativeMapOutputCollectorDelegator 源码

  • 2022-10-20
  • 浏览 (215)

haddop NativeMapOutputCollectorDelegator 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.nativetask;

import java.io.IOException;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputCollector;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler;
import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.QuickSort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * native map output collector wrapped in Java interface
 */
@InterfaceAudience.Private
public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> {

  private static final Logger LOG =
      LoggerFactory.getLogger(NativeMapOutputCollectorDelegator.class);
  private JobConf job;
  private NativeCollectorOnlyHandler<K, V> handler;

  private Context context;
  private StatusReportChecker updater;

  @Override
  public void collect(K key, V value, int partition) throws IOException, InterruptedException {
    handler.collect(key, value, partition);
  }

  @Override
  public void close() throws IOException, InterruptedException {
    handler.close();
    if (null != updater) {
      updater.stop();
      NativeRuntime.reportStatus(context.getReporter());
    }
  }

  @Override
  public void flush() throws IOException, InterruptedException, ClassNotFoundException {
    handler.flush();
  }

  @SuppressWarnings("unchecked")
  @Override
  public void init(Context context) throws IOException, ClassNotFoundException {
    this.context = context;
    this.job = context.getJobConf();

    Platforms.init(job);

    if (job.getNumReduceTasks() == 0) {
      String message = "There is no reducer, no need to use native output collector";
      LOG.error(message);
      throw new InvalidJobConfException(message);
    }

    Class<?> comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null,
        RawComparator.class);
    if (comparatorClass != null && !Platforms.define(comparatorClass)) {
      String message = "Native output collector doesn't support customized java comparator "
        + job.get(MRJobConfig.KEY_COMPARATOR);
      LOG.error(message);
      throw new InvalidJobConfException(message);
    }



    if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
      String message = "Native-Task doesn't support sort class " +
        job.get(Constants.MAP_SORT_CLASS);
      LOG.error(message);
      throw new InvalidJobConfException(message);
    }

    if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) {
      String message = "Native-Task doesn't support secure shuffle";
      LOG.error(message);
      throw new InvalidJobConfException(message);
    }

    final Class<?> keyCls = job.getMapOutputKeyClass();
    try {
      @SuppressWarnings("rawtypes")
      final INativeSerializer serializer = NativeSerialization.getInstance().getSerializer(keyCls);
      if (null == serializer) {
        String message = "Key type not supported. Cannot find serializer for " + keyCls.getName();
        LOG.error(message);
        throw new InvalidJobConfException(message);
      } else if (!Platforms.support(keyCls.getName(), serializer, job)) {
        String message = "Native output collector doesn't support this key, " +
          "this key is not comparable in native: " + keyCls.getName();
        LOG.error(message);
        throw new InvalidJobConfException(message);
      }
    } catch (final IOException e) {
      String message = "Cannot find serializer for " + keyCls.getName();
      LOG.error(message);
      throw new IOException(message);
    }

    final boolean ret = NativeRuntime.isNativeLibraryLoaded();
    if (ret) {
      if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false)) {
        String codec = job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC);
        if (!NativeRuntime.supportsCompressionCodec(codec.getBytes(Charsets.UTF_8))) {
          String message = "Native output collector doesn't support compression codec " + codec;
          LOG.error(message);
          throw new InvalidJobConfException(message);
        }
      }
      NativeRuntime.configure(job);

      final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL,
          Constants.NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL);
      updater = new StatusReportChecker(context.getReporter(), updateInterval);
      updater.start();

    } else {
      String message = "NativeRuntime cannot be loaded, please check that " +
        "libnativetask.so is in hadoop library dir";
      LOG.error(message);
      throw new InvalidJobConfException(message);
    }

    this.handler = null;
    try {
      final Class<K> oKClass = (Class<K>) job.getMapOutputKeyClass();
      final Class<K> oVClass = (Class<K>) job.getMapOutputValueClass();
      final TaskAttemptID id = context.getMapTask().getTaskID();
      final TaskContext taskContext = new TaskContext(job, null, null, oKClass, oVClass,
          context.getReporter(), id);
      handler = NativeCollectorOnlyHandler.create(taskContext);
    } catch (final IOException e) {
      String message = "Native output collector cannot be loaded;";
      LOG.error(message);
      throw new IOException(message, e);
    }

    LOG.info("Native output collector can be successfully enabled!");
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop Command 源码

hadoop CommandDispatcher 源码

hadoop Constants 源码

hadoop DataChannel 源码

hadoop DataReceiver 源码

hadoop HadoopPlatform 源码

hadoop ICombineHandler 源码

hadoop INativeComparable 源码

hadoop INativeHandler 源码

hadoop NativeBatchProcessor 源码

0  赞