hadoop ReadaheadPool 源码

  • 2022-10-20
  • 浏览 (243)

haddop ReadaheadPool 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.io;

import java.io.FileDescriptor;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.nativeio.NativeIO;

import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_WILLNEED;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manages a pool of threads which can issue readahead requests on file descriptors.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReadaheadPool {
  static final Logger LOG = LoggerFactory.getLogger(ReadaheadPool.class);
  private static final int POOL_SIZE = 4;
  private static final int MAX_POOL_SIZE = 16;
  private static final int CAPACITY = 1024;
  private final ThreadPoolExecutor pool;
  
  private static ReadaheadPool instance;

  /**
   * @return Return the singleton instance for the current process.
   */
  public static ReadaheadPool getInstance() {
    synchronized (ReadaheadPool.class) {
      if (instance == null && NativeIO.isAvailable()) {
        instance = new ReadaheadPool();
      }
      return instance;
    }
  }

  @VisibleForTesting
  public static void resetInstance() {
    synchronized (ReadaheadPool.class) {
      if (instance != null) {
        instance.pool.shutdownNow();
        instance = null;
      }
    }
  }

  private ReadaheadPool() {
    pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(CAPACITY));
    pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    pool.setThreadFactory(new ThreadFactoryBuilder()
      .setDaemon(true)
      .setNameFormat("Readahead Thread #%d")
      .build());
  }

  /**
   * Issue a request to readahead on the given file descriptor.
   * 
   * @param identifier a textual identifier that will be used in error
   * messages (e.g. the file name)
   * @param fd the file descriptor to read ahead
   * @param curPos the current offset at which reads are being issued
   * @param readaheadLength the configured length to read ahead
   * @param maxOffsetToRead the maximum offset that will be readahead
   *        (useful if, for example, only some segment of the file is
   *        requested by the user). Pass {@link Long#MAX_VALUE} to allow
   *        readahead to the end of the file.
   * @param lastReadahead the result returned by the previous invocation
   *        of this function on this file descriptor, or null if this is
   *        the first call
   * @return an object representing this outstanding request, or null
   *        if no readahead was performed
   */
  public ReadaheadRequest readaheadStream(
      String identifier,
      FileDescriptor fd,
      long curPos,
      long readaheadLength,
      long maxOffsetToRead,
      ReadaheadRequest lastReadahead) {
    
    Preconditions.checkArgument(curPos <= maxOffsetToRead,
        "Readahead position %s higher than maxOffsetToRead %s",
        curPos, maxOffsetToRead);

    if (readaheadLength <= 0) {
      return null;
    }
    
    long lastOffset = Long.MIN_VALUE;
    
    if (lastReadahead != null) {
      lastOffset = lastReadahead.getOffset();
    }

    // trigger each readahead when we have reached the halfway mark
    // in the previous readahead. This gives the system time
    // to satisfy the readahead before we start reading the data.
    long nextOffset = lastOffset + readaheadLength / 2; 
    if (curPos >= nextOffset) {
      // cancel any currently pending readahead, to avoid
      // piling things up in the queue. Each reader should have at most
      // one outstanding request in the queue.
      if (lastReadahead != null) {
        lastReadahead.cancel();
        lastReadahead = null;
      }
      
      long length = Math.min(readaheadLength,
          maxOffsetToRead - curPos);

      if (length <= 0) {
        // we've reached the end of the stream
        return null;
      }
      
      return submitReadahead(identifier, fd, curPos, length);
    } else {
      return lastReadahead;
    }
  }
      
  /**
   * Submit a request to readahead on the given file descriptor.
   * @param identifier a textual identifier used in error messages, etc.
   * @param fd the file descriptor to readahead
   * @param off the offset at which to start the readahead
   * @param len the number of bytes to read
   * @return an object representing this pending request
   */
  public ReadaheadRequest submitReadahead(
      String identifier, FileDescriptor fd, long off, long len) {
    ReadaheadRequestImpl req = new ReadaheadRequestImpl(
        identifier, fd, off, len);
    pool.execute(req);
    if (LOG.isTraceEnabled()) {
      LOG.trace("submit readahead: " + req);
    }
    return req;
  }
  
  /**
   * An outstanding readahead request that has been submitted to
   * the pool. This request may be pending or may have been
   * completed.
   */
  public interface ReadaheadRequest {
    /**
     * Cancels the request for readahead. This should be used
     * if the reader no longer needs the requested data, <em>before</em>
     * closing the related file descriptor.
     * 
     * It is safe to use even if the readahead request has already
     * been fulfilled.
     */
    public void cancel();
    
    /**
     * @return the requested offset
     */
    public long getOffset();

    /**
     * @return the requested length
     */
    public long getLength();
  }
  
  private static class ReadaheadRequestImpl implements Runnable, ReadaheadRequest {
    private final String identifier;
    private final FileDescriptor fd;
    private final long off, len;
    private volatile boolean canceled = false;
    
    private ReadaheadRequestImpl(String identifier, FileDescriptor fd, long off, long len) {
      this.identifier = identifier;
      this.fd = fd;
      this.off = off;
      this.len = len;
    }
    
    @Override
    public void run() {
      if (canceled) return;
      // There's a very narrow race here that the file will close right at
      // this instant. But if that happens, we'll likely receive an EBADF
      // error below, and see that it's canceled, ignoring the error.
      // It's also possible that we'll end up requesting readahead on some
      // other FD, which may be wasted work, but won't cause a problem.
      try {
        if (fd.valid()) {
          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
              identifier, fd, off, len, POSIX_FADV_WILLNEED);
        }
      } catch (IOException ioe) {
        if (canceled) {
          // no big deal - the reader canceled the request and closed
          // the file.
          return;
        }
        LOG.warn("Failed readahead on " + identifier,
            ioe);
      }
    }

    @Override
    public void cancel() {
      canceled = true;
      // We could attempt to remove it from the work queue, but that would
      // add complexity. In practice, the work queues remain very short,
      // so removing canceled requests has no gain.
    }

    @Override
    public long getOffset() {
      return off;
    }

    @Override
    public long getLength() {
      return len;
    }

    @Override
    public String toString() {
      return "ReadaheadRequestImpl [identifier='" + identifier + "', fd=" + fd
          + ", off=" + off + ", len=" + len + "]";
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractMapWritable 源码

hadoop ArrayFile 源码

hadoop ArrayPrimitiveWritable 源码

hadoop ArrayWritable 源码

hadoop BinaryComparable 源码

hadoop BloomMapFile 源码

hadoop BooleanWritable 源码

hadoop BoundedByteArrayOutputStream 源码

hadoop ByteBufferPool 源码

hadoop ByteWritable 源码

0  赞