hadoop CachingBlockManager 源码

  • 2022-10-20
  • 浏览 (225)

haddop CachingBlockManager 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.impl.prefetch;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.statistics.DurationTracker;

import static java.util.Objects.requireNonNull;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
 * Provides read access to the underlying file one block at a time.
 * Improve read performance by prefetching and locall caching blocks.
 */
public abstract class CachingBlockManager extends BlockManager {
  private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
  private static final int TIMEOUT_MINUTES = 60;

  /**
   * Asynchronous tasks are performed in this pool.
   */
  private final ExecutorServiceFuturePool futurePool;

  /**
   * Pool of shared ByteBuffer instances.
   */
  private BufferPool bufferPool;

  /**
   * Size of the in-memory cache in terms of number of blocks.
   * Total memory consumption is up to bufferPoolSize * blockSize.
   */
  private final int bufferPoolSize;

  /**
   * Local block cache.
   */
  private BlockCache cache;

  /**
   * Error counts. For testing purposes.
   */
  private final AtomicInteger numCachingErrors;
  private final AtomicInteger numReadErrors;

  /**
   * Operations performed by this block manager.
   */
  private final BlockOperations ops;

  private boolean closed;

  /**
   * If a single caching operation takes more than this time (in seconds),
   * we disable caching to prevent further perf degradation due to caching.
   */
  private static final int SLOW_CACHING_THRESHOLD = 5;

  /**
   * Once set to true, any further caching requests will be ignored.
   */
  private final AtomicBoolean cachingDisabled;

  private final PrefetchingStatistics prefetchingStatistics;

  /**
   * Constructs an instance of a {@code CachingBlockManager}.
   *
   * @param futurePool asynchronous tasks are performed in this pool.
   * @param blockData information about each block of the underlying file.
   * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
   * @param prefetchingStatistics statistics for this stream.
   *
   * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
   */
  public CachingBlockManager(
      ExecutorServiceFuturePool futurePool,
      BlockData blockData,
      int bufferPoolSize,
      PrefetchingStatistics prefetchingStatistics) {
    super(blockData);

    Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");

    this.futurePool = requireNonNull(futurePool);
    this.bufferPoolSize = bufferPoolSize;
    this.numCachingErrors = new AtomicInteger();
    this.numReadErrors = new AtomicInteger();
    this.cachingDisabled = new AtomicBoolean();
    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

    if (this.getBlockData().getFileSize() > 0) {
      this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
          this.prefetchingStatistics);
      this.cache = this.createCache();
    }

    this.ops = new BlockOperations();
    this.ops.setDebug(false);
  }

  /**
   * Gets the block having the given {@code blockNumber}.
   *
   * @throws IllegalArgumentException if blockNumber is negative.
   */
  @Override
  public BufferData get(int blockNumber) throws IOException {
    checkNotNegative(blockNumber, "blockNumber");

    BufferData data;
    final int maxRetryDelayMs = bufferPoolSize * 120 * 1000;
    final int statusUpdateDelayMs = 120 * 1000;
    Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);
    boolean done;

    do {
      if (closed) {
        throw new IOException("this stream is already closed");
      }

      data = bufferPool.acquire(blockNumber);
      done = getInternal(data);

      if (retryer.updateStatus()) {
        LOG.warn("waiting to get block: {}", blockNumber);
        LOG.info("state = {}", this.toString());
      }
    }
    while (!done && retryer.continueRetry());

    if (done) {
      return data;
    } else {
      String message = String.format("Wait failed for get(%d)", blockNumber);
      throw new IllegalStateException(message);
    }
  }

  private boolean getInternal(BufferData data) throws IOException {
    Validate.checkNotNull(data, "data");

    // Opportunistic check without locking.
    if (data.stateEqualsOneOf(
        BufferData.State.PREFETCHING,
        BufferData.State.CACHING,
        BufferData.State.DONE)) {
      return false;
    }

    synchronized (data) {
      // Reconfirm state after locking.
      if (data.stateEqualsOneOf(
          BufferData.State.PREFETCHING,
          BufferData.State.CACHING,
          BufferData.State.DONE)) {
        return false;
      }

      int blockNumber = data.getBlockNumber();
      if (data.getState() == BufferData.State.READY) {
        BlockOperations.Operation op = ops.getPrefetched(blockNumber);
        ops.end(op);
        return true;
      }

      data.throwIfStateIncorrect(BufferData.State.BLANK);
      read(data);
      return true;
    }
  }

  /**
   * Releases resources allocated to the given block.
   *
   * @throws IllegalArgumentException if data is null.
   */
  @Override
  public void release(BufferData data) {
    if (closed) {
      return;
    }

    Validate.checkNotNull(data, "data");

    BlockOperations.Operation op = ops.release(data.getBlockNumber());
    bufferPool.release(data);
    ops.end(op);
  }

  @Override
  public synchronized void close() {
    if (closed) {
      return;
    }

    closed = true;

    final BlockOperations.Operation op = ops.close();

    // Cancel any prefetches in progress.
    cancelPrefetches();

    cleanupWithLogger(LOG, cache);

    ops.end(op);
    LOG.info(ops.getSummary(false));

    bufferPool.close();
    bufferPool = null;
  }

  /**
   * Requests optional prefetching of the given block.
   * The block is prefetched only if we can acquire a free buffer.
   *
   * @throws IllegalArgumentException if blockNumber is negative.
   */
  @Override
  public void requestPrefetch(int blockNumber) {
    checkNotNegative(blockNumber, "blockNumber");

    if (closed) {
      return;
    }

    // We initiate a prefetch only if we can acquire a buffer from the shared pool.
    BufferData data = bufferPool.tryAcquire(blockNumber);
    if (data == null) {
      return;
    }

    // Opportunistic check without locking.
    if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
      // The block is ready or being prefetched/cached.
      return;
    }

    synchronized (data) {
      // Reconfirm state after locking.
      if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
        // The block is ready or being prefetched/cached.
        return;
      }

      BlockOperations.Operation op = ops.requestPrefetch(blockNumber);
      PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now());
      Future<Void> prefetchFuture = futurePool.executeFunction(prefetchTask);
      data.setPrefetch(prefetchFuture);
      ops.end(op);
    }
  }

  /**
   * Requests cancellation of any previously issued prefetch requests.
   */
  @Override
  public void cancelPrefetches() {
    BlockOperations.Operation op = ops.cancelPrefetches();

    for (BufferData data : bufferPool.getAll()) {
      // We add blocks being prefetched to the local cache so that the prefetch is not wasted.
      if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
        requestCaching(data);
      }
    }

    ops.end(op);
  }

  private void read(BufferData data) throws IOException {
    synchronized (data) {
      readBlock(data, false, BufferData.State.BLANK);
    }
  }

  private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
    synchronized (data) {
      prefetchingStatistics.executorAcquired(
          Duration.between(taskQueuedStartTime, Instant.now()));
      readBlock(
          data,
          true,
          BufferData.State.PREFETCHING,
          BufferData.State.CACHING);
    }
  }

  private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState)
      throws IOException {

    if (closed) {
      return;
    }

    BlockOperations.Operation op = null;
    DurationTracker tracker = null;

    synchronized (data) {
      try {
        if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
          // DONE  : Block was released, likely due to caching being disabled on slow perf.
          // READY : Block was already fetched by another thread. No need to re-read.
          return;
        }

        data.throwIfStateIncorrect(expectedState);
        int blockNumber = data.getBlockNumber();

        // Prefer reading from cache over reading from network.
        if (cache.containsBlock(blockNumber)) {
          op = ops.getCached(blockNumber);
          cache.get(blockNumber, data.getBuffer());
          data.setReady(expectedState);
          return;
        }

        if (isPrefetch) {
          tracker = prefetchingStatistics.prefetchOperationStarted();
          op = ops.prefetch(data.getBlockNumber());
        } else {
          op = ops.getRead(data.getBlockNumber());
        }

        long offset = getBlockData().getStartOffset(data.getBlockNumber());
        int size = getBlockData().getSize(data.getBlockNumber());
        ByteBuffer buffer = data.getBuffer();
        buffer.clear();
        read(buffer, offset, size);
        buffer.flip();
        data.setReady(expectedState);
      } catch (Exception e) {
        String message = String.format("error during readBlock(%s)", data.getBlockNumber());
        LOG.error(message, e);

        if (isPrefetch && tracker != null) {
          tracker.failed();
        }

        numReadErrors.incrementAndGet();
        data.setDone();
        throw e;
      } finally {
        if (op != null) {
          ops.end(op);
        }

        if (isPrefetch) {
          prefetchingStatistics.prefetchOperationCompleted();
          if (tracker != null) {
            tracker.close();
          }
        }
      }
    }
  }

  /**
   * Read task that is submitted to the future pool.
   */
  private static class PrefetchTask implements Supplier<Void> {
    private final BufferData data;
    private final CachingBlockManager blockManager;
    private final Instant taskQueuedStartTime;

    PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
      this.data = data;
      this.blockManager = blockManager;
      this.taskQueuedStartTime = taskQueuedStartTime;
    }

    @Override
    public Void get() {
      try {
        blockManager.prefetch(data, taskQueuedStartTime);
      } catch (Exception e) {
        LOG.error("error during prefetch", e);
      }
      return null;
    }
  }

  private static final BufferData.State[] EXPECTED_STATE_AT_CACHING =
      new BufferData.State[] {
          BufferData.State.PREFETCHING, BufferData.State.READY
      };

  /**
   * Requests that the given block should be copied to the local cache.
   * The block must not be accessed by the caller after calling this method
   * because it will released asynchronously relative to the caller.
   *
   * @throws IllegalArgumentException if data is null.
   */
  @Override
  public void requestCaching(BufferData data) {
    if (closed) {
      return;
    }

    if (cachingDisabled.get()) {
      data.setDone();
      return;
    }

    Validate.checkNotNull(data, "data");

    // Opportunistic check without locking.
    if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
      return;
    }

    synchronized (data) {
      // Reconfirm state after locking.
      if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
        return;
      }

      if (cache.containsBlock(data.getBlockNumber())) {
        data.setDone();
        return;
      }

      BufferData.State state = data.getState();

      BlockOperations.Operation op = ops.requestCaching(data.getBlockNumber());
      Future<Void> blockFuture;
      if (state == BufferData.State.PREFETCHING) {
        blockFuture = data.getActionFuture();
      } else {
        CompletableFuture<Void> cf = new CompletableFuture<>();
        cf.complete(null);
        blockFuture = cf;
      }

      CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
      Future<Void> actionFuture = futurePool.executeFunction(task);
      data.setCaching(actionFuture);
      ops.end(op);
    }
  }

  private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
      Instant taskQueuedStartTime) {
    prefetchingStatistics.executorAcquired(
        Duration.between(taskQueuedStartTime, Instant.now()));

    if (closed) {
      return;
    }

    if (cachingDisabled.get()) {
      data.setDone();
      return;
    }

    try {
      blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
      if (data.stateEqualsOneOf(BufferData.State.DONE)) {
        // There was an error during prefetch.
        return;
      }
    } catch (Exception e) {
      LOG.error("error waiting on blockFuture: {}", data, e);
      data.setDone();
      return;
    }

    if (cachingDisabled.get()) {
      data.setDone();
      return;
    }

    BlockOperations.Operation op = null;

    synchronized (data) {
      try {
        if (data.stateEqualsOneOf(BufferData.State.DONE)) {
          return;
        }

        if (cache.containsBlock(data.getBlockNumber())) {
          data.setDone();
          return;
        }

        op = ops.addToCache(data.getBlockNumber());
        ByteBuffer buffer = data.getBuffer().duplicate();
        buffer.rewind();
        cachePut(data.getBlockNumber(), buffer);
        data.setDone();
      } catch (Exception e) {
        numCachingErrors.incrementAndGet();
        String message = String.format("error adding block to cache after wait: %s", data);
        LOG.error(message, e);
        data.setDone();
      }

      if (op != null) {
        BlockOperations.End endOp = (BlockOperations.End) ops.end(op);
        if (endOp.duration() > SLOW_CACHING_THRESHOLD) {
          if (!cachingDisabled.getAndSet(true)) {
            String message = String.format(
                "Caching disabled because of slow operation (%.1f sec)", endOp.duration());
            LOG.warn(message);
          }
        }
      }
    }
  }

  protected BlockCache createCache() {
    return new SingleFilePerBlockCache(prefetchingStatistics);
  }

  protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
    if (closed) {
      return;
    }

    cache.put(blockNumber, buffer);
  }

  private static class CachePutTask implements Supplier<Void> {
    private final BufferData data;

    // Block being asynchronously fetched.
    private final Future<Void> blockFuture;

    // Block manager that manages this block.
    private final CachingBlockManager blockManager;

    private final Instant taskQueuedStartTime;

    CachePutTask(
        BufferData data,
        Future<Void> blockFuture,
        CachingBlockManager blockManager,
        Instant taskQueuedStartTime) {
      this.data = data;
      this.blockFuture = blockFuture;
      this.blockManager = blockManager;
      this.taskQueuedStartTime = taskQueuedStartTime;
    }

    @Override
    public Void get() {
      blockManager.addToCacheAndRelease(data, blockFuture, taskQueuedStartTime);
      return null;
    }
  }

  /**
   * Number of ByteBuffers available to be acquired.
   *
   * @return the number of available buffers.
   */
  public int numAvailable() {
    return bufferPool.numAvailable();
  }

  /**
   * Number of caching operations completed.
   *
   * @return the number of cached buffers.
   */
  public int numCached() {
    return cache.size();
  }

  /**
   * Number of errors encountered when caching.
   *
   * @return the number of errors encountered when caching.
   */
  public int numCachingErrors() {
    return numCachingErrors.get();
  }

  /**
   * Number of errors encountered when reading.
   *
   * @return the number of errors encountered when reading.
   */
  public int numReadErrors() {
    return numReadErrors.get();
  }

  BufferData getData(int blockNumber) {
    return bufferPool.tryAcquire(blockNumber);
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();

    sb.append("cache(");
    sb.append(cache.toString());
    sb.append("); ");

    sb.append("pool: ");
    sb.append(bufferPool.toString());

    return sb.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockCache 源码

hadoop BlockData 源码

hadoop BlockManager 源码

hadoop BlockOperations 源码

hadoop BoundedResourcePool 源码

hadoop BufferData 源码

hadoop BufferPool 源码

hadoop EmptyPrefetchingStatistics 源码

hadoop ExecutorServiceFuturePool 源码

hadoop FilePosition 源码

0  赞