hadoop BufferPool 源码

  • 2022-10-20
  • 浏览 (665)

haddop BufferPool 代码

文件路径:/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/BufferPool.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.cosn;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;

/**
 * BufferPool class is used to manage the buffers during program execution.
 * It is provided in a thread-safe singleton mode,and
 * keeps the program's memory and disk consumption at a stable value.
 */
public final class BufferPool {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferPool.class);

  private static BufferPool ourInstance = new BufferPool();

  /**
   * Use this method to get the instance of BufferPool.
   *
   * @return the instance of BufferPool
   */
  public static BufferPool getInstance() {
    return ourInstance;
  }

  private BlockingQueue<ByteBuffer> bufferPool = null;
  private long singleBufferSize = 0;
  private File diskBufferDir = null;

  private AtomicBoolean isInitialize = new AtomicBoolean(false);

  private BufferPool() {
  }

  private File createDir(String dirPath) throws IOException {
    File dir = new File(dirPath);
    if (!dir.exists()) {
      LOG.debug("Buffer dir: [{}] does not exists. create it first.",
          dirPath);
      if (dir.mkdirs()) {
        if (!dir.setWritable(true) || !dir.setReadable(true)
            || !dir.setExecutable(true)) {
          LOG.warn("Set the buffer dir: [{}]'s permission [writable,"
              + "readable, executable] failed.", dir.getAbsolutePath());
        }
        LOG.debug("Buffer dir: [{}] is created successfully.",
            dir.getAbsolutePath());
      } else {
        // Once again, check if it has been created successfully.
        // Prevent problems created by multiple processes at the same time.
        if (!dir.exists()) {
          throw new IOException("buffer dir:" + dir.getAbsolutePath()
              + " is created unsuccessfully");
        }
      }
    } else {
      LOG.debug("buffer dir: {} already exists.", dirPath);
    }

    return dir;
  }

  /**
   * Create buffers correctly by reading the buffer file directory,
   * buffer pool size,and file block size in the configuration.
   *
   * @param conf Provides configurations for the Hadoop runtime
   * @throws IOException Configuration errors,
   *                     insufficient or no access for memory or
   *                     disk space may cause this exception
   */
  public synchronized void initialize(Configuration conf)
      throws IOException {
    if (this.isInitialize.get()) {
      return;
    }
    this.singleBufferSize = conf.getLong(CosNConfigKeys.COSN_BLOCK_SIZE_KEY,
        CosNConfigKeys.DEFAULT_BLOCK_SIZE);

    // The block size of CosN can only support up to 2GB.
    if (this.singleBufferSize < Constants.MIN_PART_SIZE
        || this.singleBufferSize > Constants.MAX_PART_SIZE) {
      String exceptionMsg = String.format(
          "The block size of CosN is limited to %d to %d",
          Constants.MIN_PART_SIZE, Constants.MAX_PART_SIZE);
      throw new IOException(exceptionMsg);
    }

    long memoryBufferLimit = conf.getLong(
        CosNConfigKeys.COSN_UPLOAD_BUFFER_SIZE_KEY,
        CosNConfigKeys.DEFAULT_UPLOAD_BUFFER_SIZE);

    this.diskBufferDir = this.createDir(conf.get(
        CosNConfigKeys.COSN_BUFFER_DIR_KEY,
        CosNConfigKeys.DEFAULT_BUFFER_DIR));

    int bufferPoolSize = (int) (memoryBufferLimit / this.singleBufferSize);
    if (0 == bufferPoolSize) {
      throw new IOException(
          String.format("The total size of the buffer [%d] is " +
                  "smaller than a single block [%d]."
                  + "please consider increase the buffer size " +
                  "or decrease the block size",
              memoryBufferLimit, this.singleBufferSize));
    }
    this.bufferPool = new LinkedBlockingQueue<>(bufferPoolSize);
    for (int i = 0; i < bufferPoolSize; i++) {
      this.bufferPool.add(ByteBuffer.allocateDirect(
          (int) this.singleBufferSize));
    }

    this.isInitialize.set(true);
  }

  /**
   * Check if the buffer pool has been initialized.
   *
   * @throws IOException if the buffer pool is not initialized
   */
  private void checkInitialize() throws IOException {
    if (!this.isInitialize.get()) {
      throw new IOException(
          "The buffer pool has not been initialized yet");
    }
  }

  /**
   * Obtain a buffer from this buffer pool through the method.
   *
   * @param bufferSize expected buffer size to get
   * @return a buffer wrapper that satisfies the bufferSize.
   * @throws IOException if the buffer pool not initialized,
   *                     or the bufferSize parameter is not within
   *                     the range[1MB to the single buffer size]
   */
  public ByteBufferWrapper getBuffer(int bufferSize) throws IOException {
    this.checkInitialize();
    if (bufferSize > 0 && bufferSize <= this.singleBufferSize) {
      ByteBufferWrapper byteBufferWrapper = this.getByteBuffer();
      if (null == byteBufferWrapper) {
        // Use a disk buffer when the memory buffer is not enough
        byteBufferWrapper = this.getMappedBuffer();
      }
      return byteBufferWrapper;
    } else {
      String exceptionMsg = String.format(
          "Parameter buffer size out of range: 1048576 to %d",
          this.singleBufferSize
      );
      throw new IOException(exceptionMsg);
    }
  }

  /**
   * Get a ByteBufferWrapper from the buffer pool.
   *
   * @return a new byte buffer wrapper
   * @throws IOException if the buffer pool is not initialized
   */
  private ByteBufferWrapper getByteBuffer() throws IOException {
    this.checkInitialize();
    ByteBuffer buffer = this.bufferPool.poll();
    return buffer == null ? null : new ByteBufferWrapper(buffer);
  }

  /**
   * Get a mapped buffer from the buffer pool.
   *
   * @return a new mapped buffer
   * @throws IOException If the buffer pool is not initialized.
   *                     or some I/O error occurs
   */
  private ByteBufferWrapper getMappedBuffer() throws IOException {
    this.checkInitialize();
    File tmpFile = File.createTempFile(Constants.BLOCK_TMP_FILE_PREFIX,
        Constants.BLOCK_TMP_FILE_SUFFIX, this.diskBufferDir);
    tmpFile.deleteOnExit();
    RandomAccessFile raf = new RandomAccessFile(tmpFile, "rw");
    raf.setLength(this.singleBufferSize);
    MappedByteBuffer buf = raf.getChannel().map(
        FileChannel.MapMode.READ_WRITE, 0, this.singleBufferSize);
    return new ByteBufferWrapper(buf, raf, tmpFile);
  }

  /**
   * return the byte buffer wrapper to the buffer pool.
   *
   * @param byteBufferWrapper the byte buffer wrapper getting from the pool
   * @throws InterruptedException if interrupted while waiting
   * @throws IOException          some io error occurs
   */
  public void returnBuffer(ByteBufferWrapper byteBufferWrapper)
      throws InterruptedException, IOException {
    if (null == this.bufferPool || null == byteBufferWrapper) {
      return;
    }

    if (byteBufferWrapper.isDiskBuffer()) {
      byteBufferWrapper.close();
    } else {
      ByteBuffer byteBuffer = byteBufferWrapper.getByteBuffer();
      if (null != byteBuffer) {
        byteBuffer.clear();
        LOG.debug("Return the buffer to the buffer pool.");
        if (!this.bufferPool.offer(byteBuffer)) {
          LOG.error("Return the buffer to buffer pool failed.");
        }
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

hadoop CosNFileSystem 源码

0  赞