hadoop S3ABlockOutputStream 源码

  • 2022-10-20
  • 浏览 (177)

haddop S3ABlockOutputStream 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.amazonaws.SdkBaseException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;

import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Abortable;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.util.Progressable;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
 * Upload files/parts directly via different buffering mechanisms:
 * including memory and disk.
 *
 * If the stream is closed and no update has started, then the upload
 * is instead done as a single PUT operation.
 *
 * Unstable: statistics and error handling might evolve.
 *
 * Syncable is declared as supported so the calls can be
 * explicitly rejected.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream extends OutputStream implements
    StreamCapabilities, IOStatisticsSource, Syncable, Abortable {

  private static final Logger LOG =
      LoggerFactory.getLogger(S3ABlockOutputStream.class);

  private static final String E_NOT_SYNCABLE =
      "S3A streams are not Syncable. See HADOOP-17597.";

  /** Object being uploaded. */
  private final String key;

  /** Size of all blocks. */
  private final int blockSize;

  /** IO Statistics. */
  private final IOStatistics iostatistics;

  /**
   * The options this instance was created with.
   */
  private final BlockOutputStreamBuilder builder;

  /** Total bytes for uploads submitted so far. */
  private long bytesSubmitted;

  /** Callback for progress. */
  private final ProgressListener progressListener;
  private final ListeningExecutorService executorService;

  /**
   * Factory for blocks.
   */
  private final S3ADataBlocks.BlockFactory blockFactory;

  /** Preallocated byte buffer for writing single characters. */
  private final byte[] singleCharWrite = new byte[1];

  /** Multipart upload details; null means none started. */
  private MultiPartUpload multiPartUpload;

  /** Closed flag. */
  private final AtomicBoolean closed = new AtomicBoolean(false);

  /** Current data block. Null means none currently active */
  private S3ADataBlocks.DataBlock activeBlock;

  /** Count of blocks uploaded. */
  private long blockCount = 0;

  /** Statistics to build up. */
  private final BlockOutputStreamStatistics statistics;

  /**
   * Write operation helper; encapsulation of the filesystem operations.
   * This contains the audit span for the operation, and activates/deactivates
   * it within calls.
   */
  private final WriteOperations writeOperationHelper;

  /**
   * Track multipart put operation.
   */
  private final PutTracker putTracker;

  /** Should Syncable calls be downgraded? */
  private final boolean downgradeSyncableExceptions;

  /**
   * Downagraded syncable API calls are only logged at warn
   * once across the entire process.
   */
  private static final LogExactlyOnce WARN_ON_SYNCABLE =
      new LogExactlyOnce(LOG);

  /** is client side encryption enabled? */
  private final boolean isCSEEnabled;

  /** Thread level IOStatistics Aggregator. */
  private final IOStatisticsAggregator threadIOStatisticsAggregator;

  /**
   * An S3A output stream which uploads partitions in a separate pool of
   * threads; different {@link S3ADataBlocks.BlockFactory}
   * instances can control where data is buffered.
   * @throws IOException on any problem
   */
  S3ABlockOutputStream(BlockOutputStreamBuilder builder)
      throws IOException {
    builder.validate();
    this.builder = builder;
    this.key = builder.key;
    this.blockFactory = builder.blockFactory;
    this.blockSize = (int) builder.blockSize;
    this.statistics = builder.statistics;
    // test instantiations may not provide statistics;
    this.iostatistics = statistics.getIOStatistics();
    this.writeOperationHelper = builder.writeOperations;
    this.putTracker = builder.putTracker;
    this.executorService = MoreExecutors.listeningDecorator(
        builder.executorService);
    this.multiPartUpload = null;
    final Progressable progress = builder.progress;
    this.progressListener = (progress instanceof ProgressListener) ?
        (ProgressListener) progress
        : new ProgressableListener(progress);
    downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
    // create that first block. This guarantees that an open + close sequence
    // writes a 0-byte entry.
    createBlockIfNeeded();
    LOG.debug("Initialized S3ABlockOutputStream for {}" +
        " output to {}", key, activeBlock);
    if (putTracker.initialize()) {
      LOG.debug("Put tracker requests multipart upload");
      initMultipartUpload();
    }
    this.isCSEEnabled = builder.isCSEEnabled;
    this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
  }

  /**
   * Demand create a destination block.
   * @return the active block; null if there isn't one.
   * @throws IOException on any failure to create
   */
  private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
      throws IOException {
    if (activeBlock == null) {
      blockCount++;
      if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
        LOG.error("Number of partitions in stream exceeds limit for S3: "
             + Constants.MAX_MULTIPART_COUNT +  " write may fail.");
      }
      activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
    }
    return activeBlock;
  }

  /**
   * Synchronized accessor to the active block.
   * @return the active block; null if there isn't one.
   */
  private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
    return activeBlock;
  }

  /**
   * Predicate to query whether or not there is an active block.
   * @return true if there is an active block.
   */
  private synchronized boolean hasActiveBlock() {
    return activeBlock != null;
  }

  /**
   * Clear the active block.
   */
  private void clearActiveBlock() {
    if (activeBlock != null) {
      LOG.debug("Clearing active block");
    }
    synchronized (this) {
      activeBlock = null;
    }
  }

  /**
   * Check for the filesystem being open.
   * @throws IOException if the filesystem is closed.
   */
  void checkOpen() throws IOException {
    if (closed.get()) {
      throw new IOException("Filesystem " + writeOperationHelper + " closed");
    }
  }

  /**
   * The flush operation does not trigger an upload; that awaits
   * the next block being full. What it does do is call {@code flush() }
   * on the current block, leaving it to choose how to react.
   * @throws IOException Any IO problem.
   */
  @Override
  public synchronized void flush() throws IOException {
    try {
      checkOpen();
    } catch (IOException e) {
      LOG.warn("Stream closed: " + e.getMessage());
      return;
    }
    S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
    if (dataBlock != null) {
      dataBlock.flush();
    }
  }

  /**
   * Writes a byte to the destination. If this causes the buffer to reach
   * its limit, the actual upload is submitted to the threadpool.
   * @param b the int of which the lowest byte is written
   * @throws IOException on any problem
   */
  @Override
  public synchronized void write(int b) throws IOException {
    singleCharWrite[0] = (byte)b;
    write(singleCharWrite, 0, 1);
  }

  /**
   * Writes a range of bytes from to the memory buffer. If this causes the
   * buffer to reach its limit, the actual upload is submitted to the
   * threadpool and the remainder of the array is written to memory
   * (recursively).
   * @param source byte array containing
   * @param offset offset in array where to start
   * @param len number of bytes to be written
   * @throws IOException on any problem
   */
  @Override
  public synchronized void write(byte[] source, int offset, int len)
      throws IOException {

    S3ADataBlocks.validateWriteArgs(source, offset, len);
    checkOpen();
    if (len == 0) {
      return;
    }
    statistics.writeBytes(len);
    S3ADataBlocks.DataBlock block = createBlockIfNeeded();
    int written = block.write(source, offset, len);
    int remainingCapacity = block.remainingCapacity();
    if (written < len) {
      // not everything was written —the block has run out
      // of capacity
      // Trigger an upload then process the remainder.
      LOG.debug("writing more data than block has capacity -triggering upload");
      uploadCurrentBlock(false);
      // tail recursion is mildly expensive, but given buffer sizes must be MB.
      // it's unlikely to recurse very deeply.
      this.write(source, offset + written, len - written);
    } else {
      if (remainingCapacity == 0 && !isCSEEnabled) {
        // the whole buffer is done, trigger an upload
        uploadCurrentBlock(false);
      }
    }
  }

  /**
   * Start an asynchronous upload of the current block.
   *
   * @param isLast true, if part being uploaded is last and client side
   *               encryption is enabled.
   * @throws IOException Problems opening the destination for upload,
   *                     initializing the upload, or if a previous operation
   *                     has failed.
   */
  @Retries.RetryTranslated
  private synchronized void uploadCurrentBlock(boolean isLast)
      throws IOException {
    Preconditions.checkState(hasActiveBlock(), "No active block");
    LOG.debug("Writing block # {}", blockCount);
    initMultipartUpload();
    try {
      multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
      bytesSubmitted += getActiveBlock().dataSize();
    } finally {
      // set the block to null, so the next write will create a new block.
      clearActiveBlock();
    }
  }

  /**
   * Init multipart upload. Assumption: this is called from
   * a synchronized block.
   * Note that this makes a blocking HTTPS request to the far end, so
   * can take time and potentially fail.
   * @throws IOException failure to initialize the upload
   */
  @Retries.RetryTranslated
  private void initMultipartUpload() throws IOException {
    if (multiPartUpload == null) {
      LOG.debug("Initiating Multipart upload");
      multiPartUpload = new MultiPartUpload(key);
    }
  }

  /**
   * Close the stream.
   *
   * This will not return until the upload is complete
   * or the attempt to perform the upload has failed.
   * Exceptions raised in this method are indicative that the write has
   * failed and data is at risk of being lost.
   * @throws IOException on any failure.
   */
  @Override
  public void close() throws IOException {
    if (closed.getAndSet(true)) {
      // already closed
      LOG.debug("Ignoring close() as stream is already closed");
      return;
    }
    S3ADataBlocks.DataBlock block = getActiveBlock();
    boolean hasBlock = hasActiveBlock();
    LOG.debug("{}: Closing block #{}: current block= {}",
        this,
        blockCount,
        hasBlock ? block : "(none)");
    long bytes = 0;
    try {
      if (multiPartUpload == null) {
        if (hasBlock) {
          // no uploads of data have taken place, put the single block up.
          // This must happen even if there is no data, so that 0 byte files
          // are created.
          bytes = putObject();
          bytesSubmitted = bytes;
        }
      } else {
        // there's an MPU in progress';
        // IF there is more data to upload, or no data has yet been uploaded,
        // PUT the final block
        if (hasBlock &&
            (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
          // send last part and set the value of isLastPart to true.
          // Necessary to set this "true" in case of client side encryption.
          uploadCurrentBlock(true);
        }
        // wait for the partial uploads to finish
        final List<PartETag> partETags =
            multiPartUpload.waitForAllPartUploads();
        bytes = bytesSubmitted;

        // then complete the operation
        if (putTracker.aboutToComplete(multiPartUpload.getUploadId(),
            partETags,
            bytes,
            iostatistics)) {
          multiPartUpload.complete(partETags);
        } else {
          LOG.info("File {} will be visible when the job is committed", key);
        }
      }
      if (!putTracker.outputImmediatelyVisible()) {
        // track the number of bytes uploaded as commit operations.
        statistics.commitUploaded(bytes);
      }
      LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
    } catch (IOException ioe) {
      // the operation failed.
      // if this happened during a multipart upload, abort the
      // operation, so as to not leave (billable) data
      // pending on the bucket
      maybeAbortMultipart();
      writeOperationHelper.writeFailed(ioe);
      throw ioe;
    } finally {
      cleanupOnClose();
    }
    // Note end of write. This does not change the state of the remote FS.
    writeOperationHelper.writeSuccessful(bytes);
  }

  /**
   * Final operations in close/abort of stream.
   * Shuts down block factory, closes any active block,
   * and pushes out statistics.
   */
  private synchronized void cleanupOnClose() {
    cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
    mergeThreadIOStatistics(statistics.getIOStatistics());
    LOG.debug("Statistics: {}", statistics);
    cleanupWithLogger(LOG, statistics);
    clearActiveBlock();
  }

  /**
   * Merging the current thread's IOStatistics with the current IOStatistics
   * context.
   *
   * @param streamStatistics Stream statistics to be merged into thread
   *                         statistics aggregator.
   */
  private void mergeThreadIOStatistics(IOStatistics streamStatistics) {
    getThreadIOStatistics().aggregate(streamStatistics);
  }

  /**
   * Best effort abort of the multipart upload; sets
   * the field to null afterwards.
   * @return any exception caught during the operation.
   */
  private synchronized IOException maybeAbortMultipart() {
    if (multiPartUpload != null) {
      final IOException ioe = multiPartUpload.abort();
      multiPartUpload = null;
      return ioe;
    } else {
      return null;
    }
  }

  /**
   * Abort any active uploads, enter closed state.
   * @return the outcome
   */
  @Override
  public AbortableResult abort() {
    if (closed.getAndSet(true)) {
      // already closed
      LOG.debug("Ignoring abort() as stream is already closed");
      return new AbortableResultImpl(true, null);
    }
    try (DurationTracker d =
             statistics.trackDuration(INVOCATION_ABORT.getSymbol())) {
      return new AbortableResultImpl(false, maybeAbortMultipart());
    } finally {
      cleanupOnClose();
    }
  }

  /**
   * Abortable result.
   */
  private static final class AbortableResultImpl implements AbortableResult {

    /**
     * Had the stream already been closed/aborted?
     */
    private final boolean alreadyClosed;

    /**
     * Was any exception raised during non-essential
     * cleanup actions (i.e. MPU abort)?
     */
    private final IOException anyCleanupException;

    /**
     * Constructor.
     * @param alreadyClosed Had the stream already been closed/aborted?
     * @param anyCleanupException Was any exception raised during cleanup?
     */
    private AbortableResultImpl(final boolean alreadyClosed,
        final IOException anyCleanupException) {
      this.alreadyClosed = alreadyClosed;
      this.anyCleanupException = anyCleanupException;
    }

    @Override
    public boolean alreadyClosed() {
      return alreadyClosed;
    }

    @Override
    public IOException anyCleanupException() {
      return anyCleanupException;
    }

    @Override
    public String toString() {
      return new StringJoiner(", ",
          AbortableResultImpl.class.getSimpleName() + "[", "]")
          .add("alreadyClosed=" + alreadyClosed)
          .add("anyCleanupException=" + anyCleanupException)
          .toString();
    }
  }

  /**
   * Upload the current block as a single PUT request; if the buffer
   * is empty a 0-byte PUT will be invoked, as it is needed to create an
   * entry at the far end.
   * @throws IOException any problem.
   * @return number of bytes uploaded. If thread was interrupted while
   * waiting for upload to complete, returns zero with interrupted flag set
   * on this thread.
   */
  private int putObject() throws IOException {
    LOG.debug("Executing regular upload for {}", writeOperationHelper);

    final S3ADataBlocks.DataBlock block = getActiveBlock();
    int size = block.dataSize();
    final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
    final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
        writeOperationHelper.createPutObjectRequest(
            key,
            uploadData.getFile(),
            builder.putOptions)
        : writeOperationHelper.createPutObjectRequest(
            key,
            uploadData.getUploadStream(),
            size,
            builder.putOptions);
    BlockUploadProgress callback =
        new BlockUploadProgress(
            block, progressListener,  now());
    putObjectRequest.setGeneralProgressListener(callback);
    statistics.blockUploadQueued(size);
    ListenableFuture<PutObjectResult> putObjectResult =
        executorService.submit(() -> {
          try {
            // the putObject call automatically closes the input
            // stream afterwards.
            return writeOperationHelper.putObject(putObjectRequest, builder.putOptions);
          } finally {
            cleanupWithLogger(LOG, uploadData, block);
          }
        });
    clearActiveBlock();
    //wait for completion
    try {
      putObjectResult.get();
      return size;
    } catch (InterruptedException ie) {
      LOG.warn("Interrupted object upload", ie);
      Thread.currentThread().interrupt();
      return 0;
    } catch (ExecutionException ee) {
      throw extractException("regular upload", key, ee);
    }
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "S3ABlockOutputStream{");
    sb.append(writeOperationHelper.toString());
    sb.append(", blockSize=").append(blockSize);
    // unsynced access; risks consistency in exchange for no risk of deadlock.
    S3ADataBlocks.DataBlock block = activeBlock;
    if (block != null) {
      sb.append(", activeBlock=").append(block);
    }
    sb.append(" Statistics=")
        .append(IOStatisticsLogging.ioStatisticsSourceToString(this));
    sb.append('}');
    return sb.toString();
  }

  private void incrementWriteOperations() {
    writeOperationHelper.incrementWriteOperations();
  }

  /**
   * Current time in milliseconds.
   * @return time
   */
  private Instant now() {
    return Instant.now();
  }

  /**
   * Get the statistics for this stream.
   * @return stream statistics
   */
  BlockOutputStreamStatistics getStatistics() {
    return statistics;
  }

  /**
   * Return the stream capabilities.
   * This stream always returns false when queried about hflush and hsync.
   * If asked about {@link CommitConstants#STREAM_CAPABILITY_MAGIC_OUTPUT}
   * it will return true iff this is an active "magic" output stream.
   * @param capability string to query the stream support for.
   * @return true if the capability is supported by this instance.
   */
  @SuppressWarnings("deprecation")
  @Override
  public boolean hasCapability(String capability) {
    switch (capability.toLowerCase(Locale.ENGLISH)) {

      // does the output stream have delayed visibility
    case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
    case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT_OLD:
      return !putTracker.outputImmediatelyVisible();

      // The flush/sync options are absolutely not supported
    case StreamCapabilities.HFLUSH:
    case StreamCapabilities.HSYNC:
      return false;

      // yes, we do statistics.
    case StreamCapabilities.IOSTATISTICS:
      return true;

      // S3A supports abort.
    case StreamCapabilities.ABORTABLE_STREAM:
      return true;

      // IOStatistics context support for thread-level IOStatistics.
    case StreamCapabilities.IOSTATISTICS_CONTEXT:
      return true;

    default:
      return false;
    }
  }

  @Override
  public void hflush() throws IOException {
    statistics.hflushInvoked();
    handleSyncableInvocation();
  }

  @Override
  public void hsync() throws IOException {
    statistics.hsyncInvoked();
    handleSyncableInvocation();
  }

  /**
   * Shared processing of Syncable operation reporting/downgrade.
   */
  private void handleSyncableInvocation() {
    final UnsupportedOperationException ex
        = new UnsupportedOperationException(E_NOT_SYNCABLE);
    if (!downgradeSyncableExceptions) {
      throw ex;
    }
    // downgrading.
    WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against"
        + " stream writing to {}. This is Unsupported",
        key);
    // and log at debug
    LOG.debug("Downgrading Syncable call", ex);
  }

  @Override
  public IOStatistics getIOStatistics() {
    return iostatistics;
  }

  /**
   * Get the IOStatistics aggregator passed in the builder.
   * @return an aggregator
   */
  protected IOStatisticsAggregator getThreadIOStatistics() {
    return threadIOStatisticsAggregator;
  }

  /**
   * Multiple partition upload.
   */
  private class MultiPartUpload {
    private final String uploadId;
    private final List<ListenableFuture<PartETag>> partETagsFutures;
    private int partsSubmitted;
    private int partsUploaded;
    private long bytesSubmitted;

    /**
     * Any IOException raised during block upload.
     * if non-null, then close() MUST NOT complete
     * the file upload.
     */
    private IOException blockUploadFailure;

    /**
     * Constructor.
     * Initiates the MPU request against S3.
     * @param key upload destination
     * @throws IOException failure
     */

    @Retries.RetryTranslated
    MultiPartUpload(String key) throws IOException {
      this.uploadId = trackDuration(statistics,
          OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
          () -> writeOperationHelper.initiateMultiPartUpload(
              key,
              builder.putOptions));

      this.partETagsFutures = new ArrayList<>(2);
      LOG.debug("Initiated multi-part upload for {} with " +
          "id '{}'", writeOperationHelper, uploadId);
    }

    /**
     * Get a count of parts submitted.
     * @return the number of parts submitted; will always be >= the
     * value of {@link #getPartsUploaded()}
     */
    public int getPartsSubmitted() {
      return partsSubmitted;
    }

    /**
     * Count of parts actually uploaded.
     * @return the count of successfully completed part uploads.
     */
    public int getPartsUploaded() {
      return partsUploaded;
    }

    /**
     * Get the upload ID; will be null after construction completes.
     * @return the upload ID
     */
    public String getUploadId() {
      return uploadId;
    }

    /**
     * Get the count of bytes submitted.
     * @return the current upload size.
     */
    public long getBytesSubmitted() {
      return bytesSubmitted;
    }

    /**
     * A block upload has failed.
     * Recorded it if there has been no previous failure.
     * @param e error
     */
    public void noteUploadFailure(final IOException e) {
      if (blockUploadFailure == null) {
        blockUploadFailure = e;
      }
    }

    /**
     * If there is a block upload failure -throw it.
     * @throws IOException if one has already been caught.
     */
    public void maybeRethrowUploadFailure() throws IOException {
      if (blockUploadFailure != null) {
        throw blockUploadFailure;
      }
    }

    /**
     * Upload a block of data.
     * This will take the block
     * @param block block to upload
     * @throws IOException upload failure
     * @throws PathIOException if too many blocks were written
     */
    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
        Boolean isLast)
        throws IOException {
      LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
      Preconditions.checkNotNull(uploadId, "Null uploadId");
      maybeRethrowUploadFailure();
      partsSubmitted++;
      final int size = block.dataSize();
      bytesSubmitted += size;
      final int currentPartNumber = partETagsFutures.size() + 1;
      final UploadPartRequest request;
      final S3ADataBlocks.BlockUploadData uploadData;
      try {
        uploadData = block.startUpload();
        request = writeOperationHelper.newUploadPartRequest(
            key,
            uploadId,
            currentPartNumber,
            size,
            uploadData.getUploadStream(),
            uploadData.getFile(),
            0L);
        request.setLastPart(isLast);
      } catch (SdkBaseException aws) {
        // catch and translate
        IOException e = translateException("upload", key, aws);
        // failure to start the upload.
        noteUploadFailure(e);
        throw e;
      } catch (IOException e) {
        // failure to start the upload.
        noteUploadFailure(e);
        throw e;
      }
      BlockUploadProgress callback =
          new BlockUploadProgress(
              block, progressListener, now());
      request.setGeneralProgressListener(callback);
      statistics.blockUploadQueued(block.dataSize());
      ListenableFuture<PartETag> partETagFuture =
          executorService.submit(() -> {
            // this is the queued upload operation
            // do the upload
            try {
              LOG.debug("Uploading part {} for id '{}'",
                  currentPartNumber, uploadId);
              PartETag partETag = writeOperationHelper.uploadPart(request)
                  .getPartETag();
              LOG.debug("Completed upload of {} to part {}",
                  block, partETag.getETag());
              LOG.debug("Stream statistics of {}", statistics);
              partsUploaded++;
              return partETag;
            } catch (IOException e) {
              // save immediately.
              noteUploadFailure(e);
              throw e;
            } finally {
              // close the stream and block
              cleanupWithLogger(LOG, uploadData, block);
            }
          });
      partETagsFutures.add(partETagFuture);
    }

    /**
     * Block awaiting all outstanding uploads to complete.
     * @return list of results
     * @throws IOException IO Problems
     */
    private List<PartETag> waitForAllPartUploads() throws IOException {
      LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
      try {
        return Futures.allAsList(partETagsFutures).get();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted partUpload", ie);
        Thread.currentThread().interrupt();
        return null;
      } catch (ExecutionException ee) {
        //there is no way of recovering so abort
        //cancel all partUploads
        LOG.debug("While waiting for upload completion", ee);
        //abort multipartupload
        this.abort();
        throw extractException("Multi-part upload with id '" + uploadId
            + "' to " + key, key, ee);
      }
    }

    /**
     * Cancel all active uploads.
     */
    private void cancelAllActiveFutures() {
      LOG.debug("Cancelling futures");
      for (ListenableFuture<PartETag> future : partETagsFutures) {
        future.cancel(true);
      }
    }

    /**
     * This completes a multipart upload.
     * Sometimes it fails; here retries are handled to avoid losing all data
     * on a transient failure.
     * @param partETags list of partial uploads
     * @throws IOException on any problem
     */
    private void complete(List<PartETag> partETags)
        throws IOException {
      maybeRethrowUploadFailure();
      AtomicInteger errorCount = new AtomicInteger(0);
      try {
        trackDurationOfInvocation(statistics,
            MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
              writeOperationHelper.completeMPUwithRetries(key,
                  uploadId,
                  partETags,
                  bytesSubmitted,
                  errorCount,
                  builder.putOptions);
            });
      } finally {
        statistics.exceptionInMultipartComplete(errorCount.get());
      }
    }

    /**
     * Abort a multi-part upload. Retries are not attempted on failures.
     * IOExceptions are caught; this is expected to be run as a cleanup process.
     * @return any caught exception.
     */
    private IOException abort() {
      LOG.debug("Aborting upload");
      try {
        trackDurationOfInvocation(statistics,
            OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
              cancelAllActiveFutures();
              writeOperationHelper.abortMultipartUpload(key, uploadId,
                  false, null);
            });
        return null;
      } catch (IOException e) {
        // this point is only reached if the operation failed more than
        // the allowed retry count
        LOG.warn("Unable to abort multipart upload,"
            + " you may need to purge uploaded parts", e);
        statistics.exceptionInMultipartAbort();
        return e;
      }
    }
  }

  /**
   * The upload progress listener registered for events returned
   * during the upload of a single block.
   * It updates statistics and handles the end of the upload.
   * Transfer failures are logged at WARN.
   */
  private final class BlockUploadProgress implements ProgressListener {
    private final S3ADataBlocks.DataBlock block;
    private final ProgressListener nextListener;
    private final Instant transferQueueTime;
    private Instant transferStartTime;

    /**
     * Track the progress of a single block upload.
     * @param block block to monitor
     * @param nextListener optional next progress listener
     * @param transferQueueTime time the block was transferred
     * into the queue
     */
    private BlockUploadProgress(S3ADataBlocks.DataBlock block,
        ProgressListener nextListener,
        Instant transferQueueTime) {
      this.block = block;
      this.transferQueueTime = transferQueueTime;
      this.nextListener = nextListener;
    }

    @Override
    public void progressChanged(ProgressEvent progressEvent) {
      ProgressEventType eventType = progressEvent.getEventType();
      long bytesTransferred = progressEvent.getBytesTransferred();

      int size = block.dataSize();
      switch (eventType) {

      case REQUEST_BYTE_TRANSFER_EVENT:
        // bytes uploaded
        statistics.bytesTransferred(bytesTransferred);
        break;

      case TRANSFER_PART_STARTED_EVENT:
        transferStartTime = now();
        statistics.blockUploadStarted(
            Duration.between(transferQueueTime, transferStartTime),
            size);
        incrementWriteOperations();
        break;

      case TRANSFER_PART_COMPLETED_EVENT:
        statistics.blockUploadCompleted(
            Duration.between(transferStartTime, now()),
            size);
        break;

      case TRANSFER_PART_FAILED_EVENT:
        statistics.blockUploadFailed(
            Duration.between(transferStartTime, now()),
            size);
        LOG.warn("Transfer failure of block {}", block);
        break;

      default:
        // nothing
      }

      if (nextListener != null) {
        nextListener.progressChanged(progressEvent);
      }
    }
  }

  /**
   * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}.
   */
  private static class ProgressableListener implements ProgressListener {
    private final Progressable progress;

    ProgressableListener(Progressable progress) {
      this.progress = progress;
    }

    public void progressChanged(ProgressEvent progressEvent) {
      if (progress != null) {
        progress.progress();
      }
    }
  }

  /**
   * Create a builder.
   * @return
   */
  public static BlockOutputStreamBuilder builder() {
    return new BlockOutputStreamBuilder();
  }

  /**
   * Builder class for constructing an output stream.
   */
  public static final class BlockOutputStreamBuilder {

    /** S3 object to work on. */
    private String key;

    /** The executor service to use to schedule work. */
    private ExecutorService executorService;

    /**
     * Report progress in order to prevent timeouts.
     * this object implements {@code ProgressListener} then it will be
     * directly wired up to the AWS client, so receive detailed progress
     * information.
     */
    private Progressable progress;

    /** The size of a single block. */
    private long blockSize;

    /** The factory for creating stream destinations. */
    private S3ADataBlocks.BlockFactory blockFactory;

    /** The output statistics for the stream. */
    private BlockOutputStreamStatistics statistics =
        EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;

    /** Operations to write data. */
    private WriteOperations writeOperations;

    /** put tracking for commit support. */
    private PutTracker putTracker;

    /** Should Syncable calls be downgraded? */
    private boolean downgradeSyncableExceptions;

    /** is Client side Encryption enabled? */
    private boolean isCSEEnabled;

    /**
     * Put object options.
     */
    private PutObjectOptions putOptions;

    /**
     * thread-level IOStatistics Aggregator.
     */
    private IOStatisticsAggregator ioStatisticsAggregator;

    private BlockOutputStreamBuilder() {
    }

    /**
     * Validate the arguments.
     */
    public void validate() {
      requireNonNull(key, "null key");
      requireNonNull(executorService, "null executorService");
      requireNonNull(blockFactory, "null blockFactory");
      requireNonNull(statistics, "null statistics");
      requireNonNull(writeOperations, "null writeOperationHelper");
      requireNonNull(putTracker, "null putTracker");
      requireNonNull(putOptions, "null putOptions");
      Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
          "Block size is too small: %s", blockSize);
      requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator");
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withKey(
        final String value) {
      key = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withExecutorService(
        final ExecutorService value) {
      executorService = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withProgress(
        final Progressable value) {
      progress = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withBlockSize(
        final long value) {
      blockSize = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withBlockFactory(
        final S3ADataBlocks.BlockFactory value) {
      blockFactory = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withStatistics(
        final BlockOutputStreamStatistics value) {
      statistics = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withWriteOperations(
        final WriteOperationHelper value) {
      writeOperations = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withPutTracker(
        final PutTracker value) {
      putTracker = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
        final boolean value) {
      downgradeSyncableExceptions = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
      isCSEEnabled = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withPutOptions(
        final PutObjectOptions value) {
      putOptions = value;
      return this;
    }

    /**
     * Set builder value.
     *
     * @param value new value
     * @return the builder
     */
    public BlockOutputStreamBuilder withIOStatisticsAggregator(
        final IOStatisticsAggregator value) {
      ioStatisticsAggregator = value;
      return this;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AWSBadRequestException 源码

hadoop AWSClientIOException 源码

hadoop AWSCredentialProviderList 源码

hadoop AWSNoResponseException 源码

hadoop AWSRedirectException 源码

hadoop AWSS3IOException 源码

hadoop AWSServiceIOException 源码

hadoop AWSServiceThrottledException 源码

hadoop AWSStatus500Exception 源码

hadoop AnonymousAWSCredentialsProvider 源码

0  赞