hadoop BlockReaderLocal 源码

  • 2022-10-20
  • 浏览 (205)

haddop BlockReaderLocal 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.client.impl;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * BlockReaderLocal enables local short circuited reads. If the DFS client is on
 * the same machine as the datanode, then the client can read files directly
 * from the local file system rather than going through the datanode for better
 * performance. <br>
 * {@link BlockReaderLocal} works as follows:
 * <ul>
 * <li>The client performing short circuit reads must be configured at the
 * datanode.</li>
 * <li>The client gets the file descriptors for the metadata file and the data
 * file for the block using
 * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
 * </li>
 * <li>The client reads the file descriptors.</li>
 * </ul>
 */
@InterfaceAudience.Private
class BlockReaderLocal implements BlockReader {
  static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);

  private static final DirectBufferPool bufferPool = new DirectBufferPool();

  private static BlockReaderLocalMetrics metrics;
  private static Lock metricsInitializationLock = new ReentrantLock();
  private final BlockReaderIoProvider blockReaderIoProvider;
  private static final Timer TIMER = new Timer();

  public static class Builder {
    private final int bufferSize;
    private boolean verifyChecksum;
    private int maxReadahead;
    private String filename;
    private ShortCircuitReplica replica;
    private long dataPos;
    private ExtendedBlock block;
    private StorageType storageType;
    private ShortCircuitConf shortCircuitConf;

    public Builder(ShortCircuitConf conf) {
      this.shortCircuitConf = conf;
      this.maxReadahead = Integer.MAX_VALUE;
      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
      this.bufferSize = conf.getShortCircuitBufferSize();
    }

    public Builder setVerifyChecksum(boolean verifyChecksum) {
      this.verifyChecksum = verifyChecksum;
      return this;
    }

    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
      long readahead = cachingStrategy.getReadahead() != null ?
          cachingStrategy.getReadahead() :
              HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
      return this;
    }

    public Builder setFilename(String filename) {
      this.filename = filename;
      return this;
    }

    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
      this.replica = replica;
      return this;
    }

    public Builder setStartOffset(long startOffset) {
      this.dataPos = Math.max(0, startOffset);
      return this;
    }

    public Builder setBlock(ExtendedBlock block) {
      this.block = block;
      return this;
    }

    public Builder setStorageType(StorageType storageType) {
      this.storageType = storageType;
      return this;
    }

    public BlockReaderLocal build() {
      Preconditions.checkNotNull(replica);
      return new BlockReaderLocal(this);
    }
  }

  private boolean closed = false;

  /**
   * Pair of streams for this block.
   */
  private final ShortCircuitReplica replica;

  /**
   * The data FileChannel.
   */
  private final FileChannel dataIn;

  /**
   * The next place we'll read from in the block data FileChannel.
   *
   * If data is buffered in dataBuf, this offset will be larger than the
   * offset of the next byte which a read() operation will give us.
   */
  private long dataPos;

  /**
   * The Checksum FileChannel.
   */
  private final FileChannel checksumIn;

  /**
   * Checksum type and size.
   */
  private final DataChecksum checksum;

  /**
   * If false, we will always skip the checksum.
   */
  private final boolean verifyChecksum;

  /**
   * Name of the block, for logging purposes.
   */
  private final String filename;

  /**
   * Block ID and Block Pool ID.
   */
  private final ExtendedBlock block;

  /**
   * Cache of Checksum#bytesPerChecksum.
   */
  private final int bytesPerChecksum;

  /**
   * Cache of Checksum#checksumSize.
   */
  private final int checksumSize;

  /**
   * Maximum number of chunks to allocate.
   *
   * This is used to allocate dataBuf and checksumBuf, in the event that
   * we need them.
   */
  private final int maxAllocatedChunks;

  /**
   * True if zero readahead was requested.
   */
  private final boolean zeroReadaheadRequested;

  /**
   * Maximum amount of readahead we'll do.  This will always be at least the,
   * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
   * The reason is because we need to do a certain amount of buffering in order
   * to do checksumming.
   *
   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
   * Why do we allocate buffers, and then (potentially) only use part of them?
   * The rationale is that allocating a lot of buffers of different sizes would
   * make it very difficult for the DirectBufferPool to re-use buffers.
   */
  private final int maxReadaheadLength;

  /**
   * Buffers data starting at the current dataPos and extending on
   * for dataBuf.limit().
   *
   * This may be null if we don't need it.
   */
  private ByteBuffer dataBuf;

  /**
   * Buffers checksums starting at the current checksumPos and extending on
   * for checksumBuf.limit().
   *
   * This may be null if we don't need it.
   */
  private ByteBuffer checksumBuf;

  /**
   * StorageType of replica on DataNode.
   */
  private StorageType storageType;

  private BlockReaderLocal(Builder builder) {
    this.replica = builder.replica;
    this.dataIn = replica.getDataStream().getChannel();
    this.dataPos = builder.dataPos;
    this.checksumIn = replica.getMetaStream().getChannel();
    BlockMetadataHeader header = builder.replica.getMetaHeader();
    this.checksum = header.getChecksum();
    this.verifyChecksum = builder.verifyChecksum &&
        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
    this.filename = builder.filename;
    this.block = builder.block;
    this.bytesPerChecksum = checksum.getBytesPerChecksum();
    this.checksumSize = checksum.getChecksumSize();

    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
    // Calculate the effective maximum readahead.
    // We can't do more readahead than there is space in the buffer.
    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
        ((Math.min(builder.bufferSize, builder.maxReadahead) +
            bytesPerChecksum - 1) / bytesPerChecksum);
    if (maxReadaheadChunks == 0) {
      this.zeroReadaheadRequested = true;
      maxReadaheadChunks = 1;
    } else {
      this.zeroReadaheadRequested = false;
    }
    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
    this.storageType = builder.storageType;

    if (builder.shortCircuitConf.isScrMetricsEnabled()) {
      metricsInitializationLock.lock();
      try {
        if (metrics == null) {
          metrics = BlockReaderLocalMetrics.create();
        }
      } finally {
        metricsInitializationLock.unlock();
      }
    }

    this.blockReaderIoProvider = new BlockReaderIoProvider(
        builder.shortCircuitConf, metrics, TIMER);
  }

  private synchronized void createDataBufIfNeeded() {
    if (dataBuf == null) {
      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
      dataBuf.position(0);
      dataBuf.limit(0);
    }
  }

  private synchronized void freeDataBufIfExists() {
    if (dataBuf != null) {
      // When disposing of a dataBuf, we have to move our stored file index
      // backwards.
      dataPos -= dataBuf.remaining();
      dataBuf.clear();
      bufferPool.returnBuffer(dataBuf);
      dataBuf = null;
    }
  }

  private synchronized void createChecksumBufIfNeeded() {
    if (checksumBuf == null) {
      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
      checksumBuf.position(0);
      checksumBuf.limit(0);
    }
  }

  private synchronized void freeChecksumBufIfExists() {
    if (checksumBuf != null) {
      checksumBuf.clear();
      bufferPool.returnBuffer(checksumBuf);
      checksumBuf = null;
    }
  }

  private synchronized int drainDataBuf(ByteBuffer buf) {
    if (dataBuf == null) return -1;
    int oldLimit = dataBuf.limit();
    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
    if (nRead == 0) {
      return (dataBuf.remaining() == 0) ? -1 : 0;
    }
    try {
      dataBuf.limit(dataBuf.position() + nRead);
      buf.put(dataBuf);
    } finally {
      dataBuf.limit(oldLimit);
    }
    return nRead;
  }

  /**
   * Read from the block file into a buffer.
   *
   * This function overwrites checksumBuf.  It will increment dataPos.
   *
   * @param buf   The buffer to read into.  May be dataBuf.
   *              The position and limit of this buffer should be set to
   *              multiples of the checksum size.
   * @param canSkipChecksum  True if we can skip checksumming.
   *
   * @return      Total bytes read.  0 on EOF.
   */
  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
      throws IOException {
    int total = 0;
    long startDataPos = dataPos;
    int startBufPos = buf.position();
    while (buf.hasRemaining()) {
      int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
      if (nRead < 0) {
        break;
      }
      dataPos += nRead;
      total += nRead;
    }
    if (canSkipChecksum) {
      freeChecksumBufIfExists();
      return total;
    }
    if (total > 0) {
      try {
        buf.limit(buf.position());
        buf.position(startBufPos);
        createChecksumBufIfNeeded();
        int checksumsNeeded = (total + bytesPerChecksum - 1) /
            bytesPerChecksum;
        checksumBuf.clear();
        checksumBuf.limit(checksumsNeeded * checksumSize);
        long checksumPos = BlockMetadataHeader.getHeaderSize()
            + ((startDataPos / bytesPerChecksum) * checksumSize);
        while (checksumBuf.hasRemaining()) {
          int nRead = checksumIn.read(checksumBuf, checksumPos);
          if (nRead < 0) {
            throw new IOException("Got unexpected checksum file EOF at " +
                checksumPos + ", block file position " + startDataPos +
                " for block " + block + " of file " + filename);
          }
          checksumPos += nRead;
        }
        checksumBuf.flip();

        checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
      } finally {
        buf.position(buf.limit());
      }
    }
    return total;
  }

  private boolean createNoChecksumContext() {
    return !verifyChecksum ||
        // Checksums are not stored for replicas on transient storage.  We do
        // not anchor, because we do not intend for client activity to block
        // eviction from transient storage on the DataNode side.
        (storageType != null && storageType.isTransient()) ||
        replica.addNoChecksumAnchor();
  }

  private void releaseNoChecksumContext() {
    if (verifyChecksum) {
      if (storageType == null || !storageType.isTransient()) {
        replica.removeNoChecksumAnchor();
      }
    }
  }

  @Override
  public synchronized int read(ByteBuffer buf) throws IOException {
    boolean canSkipChecksum = createNoChecksumContext();
    try {
      String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
          + "canSkipChecksum={})";
      LOG.trace(traceFormatStr + ": starting",
          buf.remaining(), block, filename, canSkipChecksum);
      int nRead;
      try {
        if (canSkipChecksum && zeroReadaheadRequested) {
          nRead = readWithoutBounceBuffer(buf);
        } else {
          nRead = readWithBounceBuffer(buf, canSkipChecksum);
        }
      } catch (IOException e) {
        LOG.trace(traceFormatStr + ": I/O error",
            buf.remaining(), block, filename, canSkipChecksum, e);
        throw e;
      }
      LOG.trace(traceFormatStr + ": returning {}",
          buf.remaining(), block, filename, canSkipChecksum, nRead);
      return nRead;
    } finally {
      if (canSkipChecksum) releaseNoChecksumContext();
    }
  }

  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
      throws IOException {
    freeDataBufIfExists();
    freeChecksumBufIfExists();
    int total = 0;
    while (buf.hasRemaining()) {
      int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
      if (nRead <= 0) break;
      dataPos += nRead;
      total += nRead;
    }
    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
  }

  /**
   * Fill the data buffer.  If necessary, validate the data against the
   * checksums.
   *
   * We always want the offsets of the data contained in dataBuf to be
   * aligned to the chunk boundary.  If we are validating checksums, we
   * accomplish this by seeking backwards in the file until we're on a
   * chunk boundary.  (This is necessary because we can't checksum a
   * partial chunk.)  If we are not validating checksums, we simply only
   * fill the latter part of dataBuf.
   *
   * @param canSkipChecksum  true if we can skip checksumming.
   * @return                 true if we hit EOF.
   * @throws IOException
   */
  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
      throws IOException {
    createDataBufIfNeeded();
    final int slop = (int)(dataPos % bytesPerChecksum);
    final long oldDataPos = dataPos;
    dataBuf.limit(maxReadaheadLength);
    if (canSkipChecksum) {
      dataBuf.position(slop);
      fillBuffer(dataBuf, true);
    } else {
      dataPos -= slop;
      dataBuf.position(0);
      fillBuffer(dataBuf, false);
    }
    dataBuf.limit(dataBuf.position());
    dataBuf.position(Math.min(dataBuf.position(), slop));
    LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
        dataBuf.remaining(), oldDataPos, block);
    return dataBuf.limit() != maxReadaheadLength;
  }

  /**
   * Read using the bounce buffer.
   *
   * A 'direct' read actually has three phases. The first drains any
   * remaining bytes from the slow read buffer. After this the read is
   * guaranteed to be on a checksum chunk boundary. If there are still bytes
   * to read, the fast direct path is used for as many remaining bytes as
   * possible, up to a multiple of the checksum chunk size. Finally, any
   * 'odd' bytes remaining at the end of the read cause another slow read to
   * be issued, which involves an extra copy.
   *
   * Every 'slow' read tries to fill the slow read buffer in one go for
   * efficiency's sake. As described above, all non-checksum-chunk-aligned
   * reads will be served from the slower read path.
   *
   * @param buf              The buffer to read into.
   * @param canSkipChecksum  True if we can skip checksums.
   */
  private synchronized int readWithBounceBuffer(ByteBuffer buf,
        boolean canSkipChecksum) throws IOException {
    int total = 0;
    int bb = drainDataBuf(buf); // drain bounce buffer if possible
    if (bb >= 0) {
      total += bb;
      if (buf.remaining() == 0) return total;
    }
    boolean eof = true, done = false;
    do {
      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
            && ((dataPos % bytesPerChecksum) == 0)) {
        // Fast lane: try to read directly into user-supplied buffer, bypassing
        // bounce buffer.
        int oldLimit = buf.limit();
        int nRead;
        try {
          buf.limit(buf.position() + maxReadaheadLength);
          nRead = fillBuffer(buf, canSkipChecksum);
        } finally {
          buf.limit(oldLimit);
        }
        if (nRead < maxReadaheadLength) {
          done = true;
        }
        if (nRead > 0) {
          eof = false;
        }
        total += nRead;
      } else {
        // Slow lane: refill bounce buffer.
        if (fillDataBuf(canSkipChecksum)) {
          done = true;
        }
        bb = drainDataBuf(buf); // drain bounce buffer if possible
        if (bb >= 0) {
          eof = false;
          total += bb;
        }
      }
    } while ((!done) && (buf.remaining() > 0));
    return (eof && total == 0) ? -1 : total;
  }

  @Override
  public synchronized int read(byte[] arr, int off, int len)
        throws IOException {
    boolean canSkipChecksum = createNoChecksumContext();
    int nRead;
    try {
      final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
          + "filename={}, block={}, canSkipChecksum={})";
      LOG.trace(traceFormatStr + ": starting",
          arr.length, off, len, filename, block, canSkipChecksum);
      try {
        if (canSkipChecksum && zeroReadaheadRequested) {
          nRead = readWithoutBounceBuffer(arr, off, len);
        } else {
          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
        }
      } catch (IOException e) {
        LOG.trace(traceFormatStr + ": I/O error",
            arr.length, off, len, filename, block, canSkipChecksum, e);
        throw e;
      }
      LOG.trace(traceFormatStr + ": returning {}",
          arr.length, off, len, filename, block, canSkipChecksum, nRead);
    } finally {
      if (canSkipChecksum) releaseNoChecksumContext();
    }
    return nRead;
  }

  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
        int len) throws IOException {
    freeDataBufIfExists();
    freeChecksumBufIfExists();
    int nRead = blockReaderIoProvider.read(
        dataIn, ByteBuffer.wrap(arr, off, len), dataPos);
    if (nRead > 0) {
      dataPos += nRead;
    } else if ((nRead == 0) && (dataPos == dataIn.size())) {
      return -1;
    }
    return nRead;
  }

  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
        boolean canSkipChecksum) throws IOException {
    createDataBufIfNeeded();
    if (!dataBuf.hasRemaining()) {
      dataBuf.position(0);
      dataBuf.limit(maxReadaheadLength);
      fillDataBuf(canSkipChecksum);
    }
    if (dataBuf.remaining() == 0) return -1;
    int toRead = Math.min(dataBuf.remaining(), len);
    dataBuf.get(arr, off, toRead);
    return toRead;
  }

  @Override
  public synchronized long skip(long n) throws IOException {
    int discardedFromBuf = 0;
    long remaining = n;
    if ((dataBuf != null) && dataBuf.hasRemaining()) {
      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
      dataBuf.position(dataBuf.position() + discardedFromBuf);
      remaining -= discardedFromBuf;
    }
    LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
            + "dataBuf and advanced dataPos by {}",
        n, block, filename, discardedFromBuf, remaining);
    dataPos += remaining;
    return n;
  }

  @Override
  public int available() {
    // We never do network I/O in BlockReaderLocal.
    return Integer.MAX_VALUE;
  }

  @Override
  public synchronized void close() throws IOException {
    if (closed) return;
    closed = true;
    LOG.trace("close(filename={}, block={})", filename, block);
    replica.unref();
    freeDataBufIfExists();
    freeChecksumBufIfExists();
    if (metrics != null) {
      metrics.collectThreadLocalStates();
    }
  }

  @Override
  public synchronized void readFully(byte[] arr, int off, int len)
      throws IOException {
    BlockReaderUtil.readFully(this, arr, off, len);
  }

  @Override
  public synchronized int readAll(byte[] buf, int off, int len)
      throws IOException {
    return BlockReaderUtil.readAll(this, buf, off, len);
  }

  @Override
  public boolean isShortCircuit() {
    return true;
  }

  /**
   * Get or create a memory map for this replica.
   *
   * There are two kinds of ClientMmap objects we could fetch here: one that
   * will always read pre-checksummed data, and one that may read data that
   * hasn't been checksummed.
   *
   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
   * the anchor count on the shared memory slot.  This will tell the DataNode
   * not to munlock the block until this ClientMmap is closed.
   * If we fetch the latter, we don't bother with anchoring.
   *
   * @param opts     The options to use, such as SKIP_CHECKSUMS.
   *
   * @return         null on failure; the ClientMmap otherwise.
   */
  @Override
  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
    boolean anchor = verifyChecksum &&
        !opts.contains(ReadOption.SKIP_CHECKSUMS);
    if (anchor) {
      if (!createNoChecksumContext()) {
        LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
            + "given, we aren't skipping checksums, and the block is not "
            + "mlocked.", block, filename);
        return null;
      }
    }
    ClientMmap clientMmap = null;
    try {
      clientMmap = replica.getOrCreateClientMmap(anchor);
    } finally {
      if ((clientMmap == null) && anchor) {
        releaseNoChecksumContext();
      }
    }
    return clientMmap;
  }

  @VisibleForTesting
  boolean getVerifyChecksum() {
    return this.verifyChecksum;
  }

  @VisibleForTesting
  int getMaxReadaheadLength() {
    return this.maxReadaheadLength;
  }

  /**
   * Make the replica anchorable.  Normally this can only be done by the
   * DataNode.  This method is only for testing.
   */
  @VisibleForTesting
  void forceAnchorable() {
    replica.getSlot().makeAnchorable();
  }

  /**
   * Make the replica unanchorable.  Normally this can only be done by the
   * DataNode.  This method is only for testing.
   */
  @VisibleForTesting
  void forceUnanchorable() {
    replica.getSlot().makeUnanchorable();
  }

  @Override
  public DataChecksum getDataChecksum() {
    return checksum;
  }

  @Override
  public int getNetworkDistance() {
    return 0;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockReaderFactory 源码

hadoop BlockReaderLocalLegacy 源码

hadoop BlockReaderRemote 源码

hadoop BlockReaderUtil 源码

hadoop CorruptFileBlockIterator 源码

hadoop DfsClientConf 源码

hadoop ExternalBlockReader 源码

hadoop LeaseRenewer 源码

hadoop SnapshotDiffReportGenerator 源码

hadoop package-info 源码

0  赞