hadoop PageBlobInputStream 源码

  • 2022-10-20
  • 浏览 (166)

haddop PageBlobInputStream 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;

import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.PageRange;

/**
 * An input stream that reads file data from a page blob stored
 * using ASV's custom format.
 */

final class PageBlobInputStream extends InputStream {
  private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class);

  // The blob we're reading from.
  private final CloudPageBlobWrapper blob;
  // The operation context to use for storage requests.
  private final OperationContext opContext;
  // The number of pages remaining to be read from the server.
  private long numberOfPagesRemaining;
  // The current byte offset to start reading from the server next,
  // equivalent to (total number of pages we've read) * (page size).
  private long currentOffsetInBlob;
  // The buffer holding the current data we last read from the server.
  private byte[] currentBuffer;
  // The current byte offset we're at in the buffer.
  private int currentBufferOffset;
  // The current buffer length
  private int currentBufferLength;
  // Maximum number of pages to get per any one request.
  private static final int MAX_PAGES_PER_DOWNLOAD =
      4 * 1024 * 1024 / PAGE_SIZE;
  // Whether the stream has been closed.
  private boolean closed = false;
  // Total stream size, or -1 if not initialized.
  long pageBlobSize = -1;
  // Current position in stream of valid data.
  long filePosition = 0;

  /**
   * Helper method to extract the actual data size of a page blob.
   * This typically involves 2 service requests (one for page ranges, another
   * for the last page's data).
   *
   * @param blob The blob to get the size from.
   * @param opContext The operation context to use for the requests.
   * @return The total data size of the blob in bytes.
   * @throws IOException If the format is corrupt.
   * @throws StorageException If anything goes wrong in the requests.
   */
  public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
      OperationContext opContext) throws IOException, StorageException {
    // Get the page ranges for the blob. There should be one range starting
    // at byte 0, but we tolerate (and ignore) ranges after the first one.
    ArrayList<PageRange> pageRanges =
        blob.downloadPageRanges(new BlobRequestOptions(), opContext);
    if (pageRanges.size() == 0) {
      return 0;
    }
    if (pageRanges.get(0).getStartOffset() != 0) {
      // Not expected: we always upload our page blobs as a contiguous range
      // starting at byte 0.
      throw badStartRangeException(blob, pageRanges.get(0));
    }
    long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;

    // Get the last page.
    long lastPageStart = totalRawBlobSize - PAGE_SIZE;
    ByteArrayOutputStream baos = 
        new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
    blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
        new BlobRequestOptions(), opContext);

    byte[] lastPage = baos.toByteArray();
    short lastPageSize = getPageSize(blob, lastPage, 0);
    long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
    return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
  }

  /**
   * Constructs a stream over the given page blob.
   */
  public PageBlobInputStream(CloudPageBlobWrapper blob,
      OperationContext opContext)
      throws IOException {
    this.blob = blob;
    this.opContext = opContext;
    ArrayList<PageRange> allRanges;
    try {
      allRanges =
          blob.downloadPageRanges(new BlobRequestOptions(), opContext);
    } catch (StorageException e) {
      throw new IOException(e);
    }
    if (allRanges.size() > 0) {
      if (allRanges.get(0).getStartOffset() != 0) {
        throw badStartRangeException(blob, allRanges.get(0));
      }
      if (allRanges.size() > 1) {
        LOG.warn(String.format(
            "Blob %s has %d page ranges beyond the first range. " 
            + "Only reading the first range.",
            blob.getUri(), allRanges.size() - 1));
      }
      numberOfPagesRemaining =
          (allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
    } else {
      numberOfPagesRemaining = 0;
    }
  }

  /** Return the size of the remaining available bytes
   * if the size is less than or equal to {@link Integer#MAX_VALUE},
   * otherwise, return {@link Integer#MAX_VALUE}.
   *
   * This is to match the behavior of DFSInputStream.available(),
   * which some clients may rely on (HBase write-ahead log reading in
   * particular).
   */
  @Override
  public synchronized int available() throws IOException {
    if (closed) {
      throw new IOException("Stream closed");
    }
    if (pageBlobSize == -1) {
      try {
        pageBlobSize = getPageBlobDataSize(blob, opContext);
      } catch (StorageException e) {
        throw new IOException("Unable to get page blob size.", e);
      }
    }

    final long remaining = pageBlobSize - filePosition;
    return remaining <= Integer.MAX_VALUE ?
        (int) remaining : Integer.MAX_VALUE;
  }

  @Override
  public synchronized void close() throws IOException {
    closed = true;
  }

  private boolean dataAvailableInBuffer() {
    return currentBuffer != null 
        && currentBufferOffset < currentBufferLength;
  }

  /**
   * Check our buffer and download more from the server if needed.
   * If data is not available in the buffer, method downloads maximum
   * page blob download size (4MB) or if there is less then 4MB left,
   * all remaining pages.
   * If we are on the last page, method will return true even if
   * we reached the end of stream.
   * @return true if there's more data in the buffer, false if buffer is empty
   *         and we reached the end of the blob.
   * @throws IOException
   */
  private synchronized boolean ensureDataInBuffer() throws IOException {
    if (dataAvailableInBuffer()) {
      // We still have some data in our buffer.
      return true;
    }
    currentBuffer = null;
    currentBufferOffset = 0;
    currentBufferLength = 0;
    if (numberOfPagesRemaining == 0) {
      // No more data to read.
      return false;
    }
    final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD,
        numberOfPagesRemaining);
    final int bufferSize = (int) (pagesToRead * PAGE_SIZE);
 
    // Download page to current buffer.
    try {
      // Create a byte array output stream to capture the results of the
      // download.
      ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
      blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
          withMD5Checking(), opContext);
      validateDataIntegrity(baos.toByteArray());
    } catch (StorageException e) {
      throw new IOException(e);
    }
    numberOfPagesRemaining -= pagesToRead;
    currentOffsetInBlob += bufferSize;

    return true;
  }

  private void validateDataIntegrity(byte[] buffer)
      throws IOException {

    if (buffer.length % PAGE_SIZE != 0) {
      throw new AssertionError("Unexpected buffer size: " 
      + buffer.length);
    }

    int bufferLength = 0;
    int numberOfPages = buffer.length / PAGE_SIZE;
    long totalPagesAfterCurrent = numberOfPagesRemaining;

    for (int page = 0; page < numberOfPages; page++) {
      // Calculate the number of pages that exist in the blob after this one
      totalPagesAfterCurrent--;

      short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE);

      // Only the last page can be partially filled.
      if (currentPageSize < PAGE_DATA_SIZE
          && totalPagesAfterCurrent > 0) {
        throw fileCorruptException(blob, String.format(
            "Page with partial data found in the middle (%d pages from the"
             + " end) that only has %d bytes of data.",
             totalPagesAfterCurrent, currentPageSize));
      }
      bufferLength += currentPageSize + PAGE_HEADER_SIZE;
    }

    currentBufferOffset = PAGE_HEADER_SIZE;
    currentBufferLength = bufferLength;
    currentBuffer = buffer;
  }

  // Reads the page size from the page header at the given offset.
  private static short getPageSize(CloudPageBlobWrapper blob,
      byte[] data, int offset) throws IOException {
    short pageSize = toShort(data[offset], data[offset + 1]);
    if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
      throw fileCorruptException(blob, String.format(
          "Unexpected page size in the header: %d.",
          pageSize));
    }
    return pageSize;
  }

  @Override
  public synchronized int read(byte[] outputBuffer, int offset, int len)
      throws IOException {
    // If len is zero return 0 per the InputStream contract
    if (len == 0) {
      return 0;
    }

    int numberOfBytesRead = 0;
    while (len > 0) {
      if (!ensureDataInBuffer()) {
        break;
      }
      int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
      int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
      System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer,
          offset, numBytesToRead);
      numberOfBytesRead += numBytesToRead;
      offset += numBytesToRead;
      len -= numBytesToRead;
      if (numBytesToRead == bytesRemainingInCurrentPage) {
        // We've finished this page, move on to the next.
        advancePagesInBuffer(1);
      } else {
        currentBufferOffset += numBytesToRead;
      }
    }

    // if outputBuffer len is > 0 and zero bytes were read, we reached
    // an EOF
    if (numberOfBytesRead == 0) {
      return -1;
    }

    filePosition += numberOfBytesRead;
    return numberOfBytesRead;
  }

  @Override
  public int read() throws IOException {
    byte[] oneByte = new byte[1];
    int result = read(oneByte);
    if (result < 0) {
      return result;
    }
    return oneByte[0];
  }

  /**
   * Skips over and discards <code>n</code> bytes of data from this input
   * stream. The <code>skip</code> method may, for a variety of reasons, end
   * up skipping over some smaller number of bytes, possibly <code>0</code>.
   * This may result from any of a number of conditions; reaching end of file
   * before <code>n</code> bytes have been skipped is only one possibility.
   * The actual number of bytes skipped is returned. If {@code n} is
   * negative, the {@code skip} method for class {@code InputStream} always
   * returns 0, and no bytes are skipped. Subclasses may handle the negative
   * value differently.
   *
   * <p> The <code>skip</code> method of this class creates a
   * byte array and then repeatedly reads into it until <code>n</code> bytes
   * have been read or the end of the stream has been reached. Subclasses are
   * encouraged to provide a more efficient implementation of this method.
   * For instance, the implementation may depend on the ability to seek.
   *
   * @param      n   the number of bytes to be skipped.
   * @return     the actual number of bytes skipped.
   * @exception  IOException  if the stream does not support seek,
   *                          or if some other I/O error occurs.
   */
  @Override
  public synchronized long skip(long n) throws IOException {
    long skipped = skipImpl(n);
    filePosition += skipped; // track the position in the stream
    return skipped;
  }

  private long skipImpl(long n) throws IOException {

    if (n == 0) {
      return 0;
    }

    // First skip within the current buffer as much as possible.
    long skippedWithinBuffer = skipWithinBuffer(n);
    if (skippedWithinBuffer > n) {
      // TO CONSIDER: Using a contracts framework such as Google's cofoja for
      // these post-conditions.
      throw new AssertionError(String.format(
          "Bug in skipWithinBuffer: it skipped over %d bytes when asked to "
          + "skip %d bytes.", skippedWithinBuffer, n));
    }
    n -= skippedWithinBuffer;
    long skipped = skippedWithinBuffer;

    if (n == 0) {
      return skipped;
    }

    if (numberOfPagesRemaining == 0) {
      throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
    } else if (numberOfPagesRemaining > 1) {
      // skip over as many pages as we can, but we must read the last
      // page as it may not be full
      long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE,
          numberOfPagesRemaining - 1);
      numberOfPagesRemaining -= pagesToSkipOver;
      currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
      skipped += pagesToSkipOver * PAGE_DATA_SIZE;
      n -= pagesToSkipOver * PAGE_DATA_SIZE;
    }

    if (n == 0) {
      return skipped;
    }

    // Now read in at the current position, and skip within current buffer.
    if (!ensureDataInBuffer()) {
      return skipped;
    }
    return skipped + skipWithinBuffer(n);
  }

  /**
   * Skip over n bytes within the current buffer or just over skip the whole
   * buffer if n is greater than the bytes remaining in the buffer.
   * @param n The number of data bytes to skip.
   * @return The number of bytes actually skipped.
   * @throws IOException if data corruption found in the buffer.
   */
  private long skipWithinBuffer(long n) throws IOException {
    if (!dataAvailableInBuffer()) {
      return 0;
    }
    long skipped = 0;
    // First skip within the current page.
    skipped = skipWithinCurrentPage(n);
    if (skipped > n) {
      throw new AssertionError(String.format(
          "Bug in skipWithinCurrentPage: it skipped over %d bytes when asked" 
          + " to skip %d bytes.", skipped, n));
    }
    n -= skipped;
    if (n == 0 || !dataAvailableInBuffer()) {
      return skipped;
    }

    // Calculate how many whole pages (pages before the possibly partially
    // filled last page) remain.
    int currentPageIndex = currentBufferOffset / PAGE_SIZE;
    int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
    int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;

    if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
      // I'm within one of the whole pages remaining, skip in there.
      advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
      currentBufferOffset += n % PAGE_DATA_SIZE;
      return n + skipped;
    }

    // Skip over the whole pages.
    advancePagesInBuffer(wholePagesRemaining);
    skipped += wholePagesRemaining * PAGE_DATA_SIZE;
    n -= wholePagesRemaining * PAGE_DATA_SIZE;

    // At this point we know we need to skip to somewhere in the last page,
    // or just go to the end.
    return skipWithinCurrentPage(n) + skipped;
  }

  /**
   * Skip over n bytes within the current page or just over skip the whole
   * page if n is greater than the bytes remaining in the page.
   * @param n The number of data bytes to skip.
   * @return The number of bytes actually skipped.
   * @throws IOException if data corruption found in the buffer.
   */
  private long skipWithinCurrentPage(long n) throws IOException {
    int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
    if (n <= remainingBytesInCurrentPage) {
      currentBufferOffset += n;
      return n;
    } else {
      advancePagesInBuffer(1);
      return remainingBytesInCurrentPage;
    }
  }

  /**
   * Gets the number of bytes remaining within the current page in the buffer.
   * @return The number of bytes remaining.
   * @throws IOException if data corruption found in the buffer.
   */
  private int getBytesRemainingInCurrentPage() throws IOException {
    if (!dataAvailableInBuffer()) {
      return 0;
    }
    // Calculate our current position relative to the start of the current
    // page.
    int currentDataOffsetInPage =
        (currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE;
    int pageBoundary = getCurrentPageStartInBuffer();
    // Get the data size of the current page from the header.
    short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
    return sizeOfCurrentPage - currentDataOffsetInPage;
  }

  private static IOException badStartRangeException(CloudPageBlobWrapper blob,
      PageRange startRange) {
    return fileCorruptException(blob, String.format(
        "Page blobs for ASV should always use a page range starting at byte 0. " 
        + "This starts at byte %d.",
        startRange.getStartOffset()));
  }

  private void advancePagesInBuffer(int numberOfPages) {
    currentBufferOffset =
        getCurrentPageStartInBuffer() 
        + (numberOfPages * PAGE_SIZE) 
        + PAGE_HEADER_SIZE;
  }

  private int getCurrentPageStartInBuffer() {
    return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE);
  }

  private static IOException fileCorruptException(CloudPageBlobWrapper blob,
      String reason) {
    return new IOException(String.format(
        "The page blob: '%s' is corrupt or has an unexpected format: %s.",
        blob.getUri(), reason));
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AzureException 源码

hadoop AzureFileSystemThreadPoolExecutor 源码

hadoop AzureFileSystemThreadTask 源码

hadoop AzureLinkedStack 源码

hadoop AzureNativeFileSystemStore 源码

hadoop BlobMaterialization 源码

hadoop BlobOperationDescriptor 源码

hadoop BlockBlobAppendStream 源码

hadoop BlockBlobInputStream 源码

hadoop CachingAuthorizer 源码

0  赞