hadoop VectoredReadBenchmark 源码

  • 2022-10-20
  • 浏览 (176)

haddop VectoredReadBenchmark 代码

文件路径:/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.benchmark;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.FileSystems;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.impl.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class VectoredReadBenchmark {

  static final Path DATA_PATH = getTestDataPath();
  static final String DATA_PATH_PROPERTY = "bench.data";
  static final int READ_SIZE = 64 * 1024;
  static final long SEEK_SIZE = 1024L * 1024;


  static Path getTestDataPath() {
    String value = System.getProperty(DATA_PATH_PROPERTY);
    return new Path(value == null ? "/tmp/taxi.orc" : value);
  }

  @State(Scope.Thread)
  public static class FileSystemChoice {

    @Param({"local", "raw"})
    private String fileSystemKind;

    private Configuration conf;
    private FileSystem fs;

    @Setup(Level.Trial)
    public void setup() {
      conf = new Configuration();
      try {
        LocalFileSystem local = FileSystem.getLocal(conf);
        fs = "raw".equals(fileSystemKind) ? local.getRaw() : local;
      } catch (IOException e) {
        throw new IllegalArgumentException("Can't get filesystem", e);
      }
    }
  }

  @State(Scope.Thread)
  public static class BufferChoice {
    @Param({"direct", "array"})
    private String bufferKind;

    private IntFunction<ByteBuffer> allocate;
    @Setup(Level.Trial)
    public void setup() {
      allocate = "array".equals(bufferKind)
                     ? ByteBuffer::allocate : ByteBuffer::allocateDirect;
    }
  }

  @Benchmark
  public void asyncRead(FileSystemChoice fsChoice,
                        BufferChoice bufferChoice,
                        Blackhole blackhole) throws Exception {
    FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
    List<FileRange> ranges = new ArrayList<>();
    for(int m=0; m < 100; ++m) {
      FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE);
      ranges.add(range);
    }
    stream.readVectored(ranges, bufferChoice.allocate);
    for(FileRange range: ranges) {
      blackhole.consume(range.getData().get());
    }
    stream.close();
  }

  static class Joiner implements CompletionHandler<ByteBuffer, FileRange> {
    private int remaining;
    private final ByteBuffer[] result;
    private Throwable exception = null;

    Joiner(int total) {
      remaining = total;
      result = new ByteBuffer[total];
    }

    synchronized void finish() {
      remaining -= 1;
      if (remaining == 0) {
        notify();
      }
    }

    synchronized ByteBuffer[] join() throws InterruptedException, IOException {
      while (remaining > 0 && exception == null) {
        wait();
      }
      if (exception != null) {
        throw new IOException("problem reading", exception);
      }
      return result;
    }


    @Override
    public synchronized void completed(ByteBuffer buffer, FileRange attachment) {
      result[--remaining] = buffer;
      if (remaining == 0) {
        notify();
      }
    }

    @Override
    public synchronized void failed(Throwable exc, FileRange attachment) {
      this.exception = exc;
      notify();
    }
  }

  static class FileRangeCallback extends FileRangeImpl implements
          CompletionHandler<Integer, FileRangeCallback> {
    private final AsynchronousFileChannel channel;
    private final ByteBuffer buffer;
    private int completed = 0;
    private final Joiner joiner;

    FileRangeCallback(AsynchronousFileChannel channel, long offset,
                      int length, Joiner joiner, ByteBuffer buffer) {
      super(offset, length);
      this.channel = channel;
      this.joiner = joiner;
      this.buffer = buffer;
    }

    @Override
    public void completed(Integer result, FileRangeCallback attachment) {
      final int bytes = result;
      if (bytes == -1) {
        failed(new EOFException("Read past end of file"), this);
      }
      completed += bytes;
      if (completed < this.getLength()) {
        channel.read(buffer, this.getOffset() + completed, this, this);
      } else {
        buffer.flip();
        joiner.finish();
      }
    }

    @Override
    public void failed(Throwable exc, FileRangeCallback attachment) {
      joiner.failed(exc, this);
    }
  }

  @Benchmark
  public void asyncFileChanArray(BufferChoice bufferChoice,
                                 Blackhole blackhole) throws Exception {
    java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString());
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
    List<FileRangeImpl> ranges = new ArrayList<>();
    Joiner joiner = new Joiner(100);
    for(int m=0; m < 100; ++m) {
      ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE);
      FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE,
          READ_SIZE, joiner, buffer);
      ranges.add(range);
      channel.read(buffer, range.getOffset(), range, range);
    }
    joiner.join();
    channel.close();
    blackhole.consume(ranges);
  }

  @Benchmark
  public void syncRead(FileSystemChoice fsChoice,
                       Blackhole blackhole) throws Exception {
    FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
    List<byte[]> result = new ArrayList<>();
    for(int m=0; m < 100; ++m) {
      byte[] buffer = new byte[READ_SIZE];
      stream.readFully(m * SEEK_SIZE, buffer);
      result.add(buffer);
    }
    blackhole.consume(result);
    stream.close();
  }

  /**
   * Run the benchmarks.
   * @param args the pathname of a 100MB data file
   * @throws Exception any ex.
   */
  public static void main(String[] args) throws Exception {
    OptionsBuilder opts = new OptionsBuilder();
    opts.include("VectoredReadBenchmark");
    opts.jvmArgs("-server", "-Xms256m", "-Xmx2g",
        "-D" + DATA_PATH_PROPERTY + "=" + args[0]);
    opts.forks(1);
    new Runner(opts.build()).run();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop package-info 源码

0  赞