hadoop NativeBatchProcessor 源码

  • 2022-10-20
  • 浏览 (202)

haddop NativeBatchProcessor 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.nativetask;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * used to create channel, transfer data and command between Java and native
 */
@InterfaceAudience.Private
public class NativeBatchProcessor implements INativeHandler {
  private static final Logger LOG =
      LoggerFactory.getLogger(NativeBatchProcessor.class);

  private final String nativeHandlerName;
  private long nativeHandlerAddr;

  private boolean isInputFinished = false;

  // << Field used directly in Native, the name must NOT be changed
  private ByteBuffer rawOutputBuffer;
  private ByteBuffer rawInputBuffer;
  // >>

  private InputBuffer in;
  private OutputBuffer out;

  private CommandDispatcher commandDispatcher;
  private DataReceiver dataReceiver;

  static {
    if (NativeRuntime.isNativeLibraryLoaded()) {
      InitIDs();
    }
  }

  public static INativeHandler create(String nativeHandlerName,
      Configuration conf, DataChannel channel) throws IOException {

    final int bufferSize = conf.getInt(Constants.NATIVE_PROCESSOR_BUFFER_KB,
        1024) * 1024;

    LOG.info("NativeHandler: direct buffer size: " + bufferSize);

    OutputBuffer out = null;
    InputBuffer in = null;

    switch (channel) {
    case IN:
      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
      break;
    case OUT:
      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
      break;
    case INOUT:
      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
      break;
    case NONE:
    }

    final INativeHandler handler = new NativeBatchProcessor(nativeHandlerName,
        in, out);
    handler.init(conf);
    return handler;
  }

  protected NativeBatchProcessor(String nativeHandlerName, InputBuffer input,
      OutputBuffer output) throws IOException {
    this.nativeHandlerName = nativeHandlerName;

    if (null != input) {
      this.in = input;
      this.rawInputBuffer = input.getByteBuffer();
    }
    if (null != output) {
      this.out = output;
      this.rawOutputBuffer = output.getByteBuffer();
    }
  }

  @Override
  public void setCommandDispatcher(CommandDispatcher handler) {
    this.commandDispatcher = handler;
  }

  @Override
  public void init(Configuration conf) throws IOException {
    this.nativeHandlerAddr = NativeRuntime
        .createNativeObject(nativeHandlerName);
    if (this.nativeHandlerAddr == 0) {
      throw new RuntimeException("Native object create failed, class: "
          + nativeHandlerName);
    }
    setupHandler(nativeHandlerAddr, ConfigUtil.toBytes(conf));
  }

  @Override
  public synchronized void close() throws IOException {
    if (nativeHandlerAddr != 0) {
      NativeRuntime.releaseNativeObject(nativeHandlerAddr);
      nativeHandlerAddr = 0;
    }
    IOUtils.cleanupWithLogger(LOG, in);
    in = null;
  }

  @Override
  public long getNativeHandler() {
    return nativeHandlerAddr;
  }

  @Override
  public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter)
      throws IOException {
    final byte[] bytes = nativeCommand(nativeHandlerAddr, command.id(),
        null == parameter ? null : parameter.getBuff());

    final ReadWriteBuffer result = new ReadWriteBuffer(bytes);
    result.setWritePoint(bytes.length);
    return result;
  }

  @Override
  public void sendData() throws IOException {
    nativeProcessInput(nativeHandlerAddr, rawOutputBuffer.position());
    rawOutputBuffer.position(0);
  }

  @Override
  public void finishSendData() throws IOException {
    if (null == rawOutputBuffer || isInputFinished) {
      return;
    }

    sendData();
    nativeFinish(nativeHandlerAddr);
    isInputFinished = true;
  }

  private byte[] sendCommandToJava(int command, byte[] data) throws IOException {
    try {

      final Command cmd = new Command(command);
      ReadWriteBuffer param = null;

      if (null != data) {
        param = new ReadWriteBuffer();
        param.reset(data);
        param.setWritePoint(data.length);
      }

      if (null != commandDispatcher) {
        ReadWriteBuffer result = null;

        result = commandDispatcher.onCall(cmd, param);
        if (null != result) {
          return result.getBuff();
        } else {
          return null;
        }
      } else {
        return null;
      }

    } catch (Exception e) {
      e.printStackTrace();
      throw new IOException(e);
    }
  }

  /**
   * Called by native side, clean output buffer so native side can continue
   * processing
   */
  private void flushOutput(int length) throws IOException {

    if (null != rawInputBuffer) {
      rawInputBuffer.position(0);
      rawInputBuffer.limit(length);

      if (null != dataReceiver) {
        try {
          dataReceiver.receiveData();
        } catch (IOException e) {
          e.printStackTrace();
          throw e;
        }
      }
    }
  }

  /**
   * Cache JNI field & method ids
   */
  private static native void InitIDs();

  /**
   * Setup native side BatchHandler
   */
  private native void setupHandler(long nativeHandlerAddr, byte[][] configs);

  /**
   * Let native side to process data in inputBuffer
   */
  private native void nativeProcessInput(long handler, int length);

  /**
   * Notice native side input is finished
   */
  private native void nativeFinish(long handler);

  /**
   * Send control message to native side
   */
  private native byte[] nativeCommand(long handler, int cmd, byte[] parameter);

  /**
   * Load data from native
   */
  private native void nativeLoadData(long handler);

  protected void finishOutput() {
  }

  @Override
  public InputBuffer getInputBuffer() {
    return this.in;
  }

  @Override
  public OutputBuffer getOutputBuffer() {
    return this.out;
  }

  @Override
  public void loadData() throws IOException {
    nativeLoadData(nativeHandlerAddr);
    //
    // return call(Command.CMD_LOAD, param);
  }

  @Override
  public void setDataReceiver(DataReceiver handler) {
    this.dataReceiver = handler;
  }

  @Override
  public String name() {
    return nativeHandlerName;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Command 源码

hadoop CommandDispatcher 源码

hadoop Constants 源码

hadoop DataChannel 源码

hadoop DataReceiver 源码

hadoop HadoopPlatform 源码

hadoop ICombineHandler 源码

hadoop INativeComparable 源码

hadoop INativeHandler 源码

hadoop NativeDataSource 源码

0  赞