hadoop OBSBlockOutputStream 源码

  • 2022-10-20
  • 浏览 (269)

haddop OBSBlockOutputStream 代码

文件路径:/hadoop-cloud-storage-project/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSBlockOutputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.obs;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import com.obs.services.exception.ObsException;
import com.obs.services.model.CompleteMultipartUploadResult;
import com.obs.services.model.PartEtag;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.UploadPartRequest;
import com.obs.services.model.UploadPartResult;
import com.obs.services.model.fs.WriteFileRequest;
import com.sun.istack.NotNull;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * OBS output stream based on block buffering.
 * <p>
 * Upload files/parts directly via different buffering mechanisms: including
 * memory and disk.
 *
 * <p>If the stream is closed and no update has started, then the upload is
 * instead done as a single PUT operation.
 *
 * <p>Unstable: statistics and error handling might evolve.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
class OBSBlockOutputStream extends OutputStream implements Syncable {

  /**
   * Class logger.
   */
  private static final Logger LOG = LoggerFactory.getLogger(
      OBSBlockOutputStream.class);

  /**
   * Owner FileSystem.
   */
  private final OBSFileSystem fs;

  /**
   * Key of the object being uploaded.
   */
  private final String key;

  /**
   * Length of object.
   */
  private long objectLen;

  /**
   * Size of all blocks.
   */
  private final int blockSize;

  /**
   * Callback for progress.
   */
  private final ListeningExecutorService executorService;

  /**
   * Factory for creating blocks.
   */
  private final OBSDataBlocks.BlockFactory blockFactory;

  /**
   * Preallocated byte buffer for writing single characters.
   */
  private final byte[] singleCharWrite = new byte[1];

  /**
   * Closed flag.
   */
  private final AtomicBoolean closed = new AtomicBoolean(false);

  /**
   * Has exception flag.
   */
  private final AtomicBoolean hasException = new AtomicBoolean(false);

  /**
   * Has flushed flag.
   */
  private final AtomicBoolean appendAble;

  /**
   * Multipart upload details; null means none started.
   */
  private MultiPartUpload multiPartUpload;

  /**
   * Current data block. Null means none currently active.
   */
  private OBSDataBlocks.DataBlock activeBlock;

  /**
   * Count of blocks uploaded.
   */
  private long blockCount = 0;

  /**
   * Write operation helper; encapsulation of the filesystem operations.
   */
  private OBSWriteOperationHelper writeOperationHelper;

  /**
   * Flag for mocking upload part error.
   */
  private boolean mockUploadPartError = false;

  /**
   * An OBS output stream which uploads partitions in a separate pool of
   * threads; different {@link OBSDataBlocks.BlockFactory} instances can control
   * where data is buffered.
   *
   * @param owner        OBSFilesystem
   * @param obsObjectKey OBS object to work on
   * @param objLen       object length
   * @param execService  the executor service to use to schedule work
   * @param isAppendable if append is supported
   * @throws IOException on any problem
   */
  OBSBlockOutputStream(
      final OBSFileSystem owner,
      final String obsObjectKey,
      final long objLen,
      final ExecutorService execService,
      final boolean isAppendable)
      throws IOException {
    this.appendAble = new AtomicBoolean(isAppendable);
    this.fs = owner;
    this.key = obsObjectKey;
    this.objectLen = objLen;
    this.blockFactory = owner.getBlockFactory();
    this.blockSize = (int) owner.getPartSize();
    this.writeOperationHelper = owner.getWriteHelper();
    Preconditions.checkArgument(
        owner.getPartSize() >= OBSConstants.MULTIPART_MIN_SIZE,
        "Block size is too small: %d", owner.getPartSize());
    this.executorService = MoreExecutors.listeningDecorator(
        execService);
    this.multiPartUpload = null;
    // create that first block. This guarantees that an open + close
    // sequence writes a 0-byte entry.
    createBlockIfNeeded();
    LOG.debug(
        "Initialized OBSBlockOutputStream for {}" + " output to {}",
        owner.getWriteHelper(),
        activeBlock);
  }

  /**
   * Demand create a destination block.
   *
   * @return the active block; null if there isn't one.
   * @throws IOException on any failure to create
   */
  private synchronized OBSDataBlocks.DataBlock createBlockIfNeeded()
      throws IOException {
    if (activeBlock == null) {
      blockCount++;
      if (blockCount >= OBSConstants.MAX_MULTIPART_COUNT) {
        LOG.warn(
            "Number of partitions in stream exceeds limit for OBS: "
                + OBSConstants.MAX_MULTIPART_COUNT
                + " write may fail.");
      }
      activeBlock = blockFactory.create(blockCount, this.blockSize);
    }
    return activeBlock;
  }

  /**
   * Synchronized accessor to the active block.
   *
   * @return the active block; null if there isn't one.
   */
  synchronized OBSDataBlocks.DataBlock getActiveBlock() {
    return activeBlock;
  }

  /**
   * Set mock error.
   *
   * @param isException mock error
   */
  @VisibleForTesting
  public void mockPutPartError(final boolean isException) {
    this.mockUploadPartError = isException;
  }

  /**
   * Predicate to query whether or not there is an active block.
   *
   * @return true if there is an active block.
   */
  private synchronized boolean hasActiveBlock() {
    return activeBlock != null;
  }

  /**
   * Clear the active block.
   */
  private synchronized void clearActiveBlock() {
    if (activeBlock != null) {
      LOG.debug("Clearing active block");
    }
    activeBlock = null;
  }

  /**
   * Check for the filesystem being open.
   *
   * @throws IOException if the filesystem is closed.
   */
  private void checkOpen() throws IOException {
    if (closed.get()) {
      throw new IOException(
          "Filesystem " + writeOperationHelper.toString(key) + " closed");
    }
  }

  /**
   * The flush operation does not trigger an upload; that awaits the next block
   * being full. What it does do is call {@code flush() } on the current block,
   * leaving it to choose how to react.
   *
   * @throws IOException Any IO problem.
   */
  @Override
  public synchronized void flush() throws IOException {
    checkOpen();
    OBSDataBlocks.DataBlock dataBlock = getActiveBlock();
    if (dataBlock != null) {
      dataBlock.flush();
    }
  }

  /**
   * Writes a byte to the destination. If this causes the buffer to reach its
   * limit, the actual upload is submitted to the threadpool.
   *
   * @param b the int of which the lowest byte is written
   * @throws IOException on any problem
   */
  @Override
  public synchronized void write(final int b) throws IOException {
    singleCharWrite[0] = (byte) b;
    write(singleCharWrite, 0, 1);
  }

  /**
   * Writes a range of bytes from to the memory buffer. If this causes the
   * buffer to reach its limit, the actual upload is submitted to the threadpool
   * and the remainder of the array is written to memory (recursively).
   *
   * @param source byte array containing
   * @param offset offset in array where to start
   * @param len    number of bytes to be written
   * @throws IOException on any problem
   */
  @Override
  public synchronized void write(@NotNull final byte[] source,
      final int offset, final int len)
      throws IOException {
    if (hasException.get()) {
      String closeWarning = String.format(
          "write has error. bs : pre upload obs[%s] has error.", key);
      LOG.warn(closeWarning);
      throw new IOException(closeWarning);
    }
    OBSDataBlocks.validateWriteArgs(source, offset, len);
    checkOpen();
    if (len == 0) {
      return;
    }

    OBSDataBlocks.DataBlock block = createBlockIfNeeded();
    int written = block.write(source, offset, len);
    int remainingCapacity = block.remainingCapacity();
    try {
      innerWrite(source, offset, len, written, remainingCapacity);
    } catch (IOException e) {
      LOG.error(
          "Write data for key {} of bucket {} error, error message {}",
          key, fs.getBucket(),
          e.getMessage());
      throw e;
    }
  }

  private synchronized void innerWrite(final byte[] source, final int offset,
      final int len,
      final int written, final int remainingCapacity)
      throws IOException {

    if (written < len) {
      // not everything was written the block has run out
      // of capacity
      // Trigger an upload then process the remainder.
      LOG.debug(
          "writing more data than block has capacity -triggering upload");
      if (appendAble.get()) {
        // to write a buffer then append to obs
        LOG.debug("[Append] open stream and single write size {} "
                + "greater than buffer size {}, append buffer to obs.",
            len, blockSize);
        flushCurrentBlock();
      } else {
        // block output stream logic, multi-part upload
        uploadCurrentBlock();
      }
      // tail recursion is mildly expensive, but given buffer sizes
      // must be MB. it's unlikely to recurse very deeply.
      this.write(source, offset + written, len - written);
    } else {
      if (remainingCapacity == 0) {
        // the whole buffer is done, trigger an upload
        if (appendAble.get()) {
          // to write a buffer then append to obs
          LOG.debug("[Append] open stream and already write size "
                  + "equal to buffer size {}, append buffer to obs.",
              blockSize);
          flushCurrentBlock();
        } else {
          // block output stream logic, multi-part upload
          uploadCurrentBlock();
        }
      }
    }
  }

  /**
   * Start an asynchronous upload of the current block.
   *
   * @throws IOException Problems opening the destination for upload or
   *                     initializing the upload.
   */
  private synchronized void uploadCurrentBlock() throws IOException {
    Preconditions.checkState(hasActiveBlock(), "No active block");
    LOG.debug("Writing block # {}", blockCount);

    try {
      if (multiPartUpload == null) {
        LOG.debug("Initiating Multipart upload");
        multiPartUpload = new MultiPartUpload();
      }
      multiPartUpload.uploadBlockAsync(getActiveBlock());
    } catch (IOException e) {
      hasException.set(true);
      LOG.error("Upload current block on ({}/{}) failed.", fs.getBucket(),
          key, e);
      throw e;
    } finally {
      // set the block to null, so the next write will create a new block.
      clearActiveBlock();
    }
  }

  /**
   * Close the stream.
   *
   * <p>This will not return until the upload is complete or the attempt to
   * perform the upload has failed. Exceptions raised in this method are
   * indicative that the write has failed and data is at risk of being lost.
   *
   * @throws IOException on any failure.
   */
  @Override
  public synchronized void close() throws IOException {
    if (closed.getAndSet(true)) {
      // already closed
      LOG.debug("Ignoring close() as stream is already closed");
      return;
    }
    if (hasException.get()) {
      String closeWarning = String.format(
          "closed has error. bs : pre write obs[%s] has error.", key);
      LOG.warn(closeWarning);
      throw new IOException(closeWarning);
    }
    // do upload
    completeCurrentBlock();

    // clear
    clearHFlushOrSync();

    // All end of write operations, including deleting fake parent
    // directories
    writeOperationHelper.writeSuccessful(key);
  }

  /**
   * If flush has take place, need to append file, else to put object.
   *
   * @throws IOException any problem in append or put object
   */
  private synchronized void putObjectIfNeedAppend() throws IOException {
    if (appendAble.get() && fs.exists(
        OBSCommonUtils.keyToQualifiedPath(fs, key))) {
      appendFsFile();
    } else {
      putObject();
    }
  }

  /**
   * Append posix file.
   *
   * @throws IOException any problem
   */
  private synchronized void appendFsFile() throws IOException {
    LOG.debug("bucket is posix, to append file. key is {}", key);
    final OBSDataBlocks.DataBlock block = getActiveBlock();
    WriteFileRequest writeFileReq;
    if (block instanceof OBSDataBlocks.DiskBlock) {
      writeFileReq = OBSCommonUtils.newAppendFileRequest(fs, key,
          objectLen, (File) block.startUpload());
    } else {
      writeFileReq = OBSCommonUtils.newAppendFileRequest(fs, key,
          objectLen, (InputStream) block.startUpload());
    }
    OBSCommonUtils.appendFile(fs, writeFileReq);
    objectLen += block.dataSize();
  }

  /**
   * Upload the current block as a single PUT request; if the buffer is empty a
   * 0-byte PUT will be invoked, as it is needed to create an entry at the far
   * end.
   *
   * @throws IOException any problem.
   */
  private synchronized void putObject() throws IOException {
    LOG.debug("Executing regular upload for {}",
        writeOperationHelper.toString(key));

    final OBSDataBlocks.DataBlock block = getActiveBlock();
    clearActiveBlock();
    final int size = block.dataSize();
    final PutObjectRequest putObjectRequest;
    if (block instanceof OBSDataBlocks.DiskBlock) {
      putObjectRequest = writeOperationHelper.newPutRequest(key,
          (File) block.startUpload());

    } else {
      putObjectRequest =
          writeOperationHelper.newPutRequest(key,
              (InputStream) block.startUpload(), size);

    }
    putObjectRequest.setAcl(fs.getCannedACL());
    fs.getSchemeStatistics().incrementWriteOps(1);
    try {
      // the putObject call automatically closes the input
      // stream afterwards.
      writeOperationHelper.putObject(putObjectRequest);
    } finally {
      OBSCommonUtils.closeAll(block);
    }
  }

  @Override
  public synchronized String toString() {
    final StringBuilder sb = new StringBuilder("OBSBlockOutputStream{");
    sb.append(writeOperationHelper.toString());
    sb.append(", blockSize=").append(blockSize);
    OBSDataBlocks.DataBlock block = activeBlock;
    if (block != null) {
      sb.append(", activeBlock=").append(block);
    }
    sb.append('}');
    return sb.toString();
  }

  public synchronized void sync() {
    // need to do
  }

  @Override
  public synchronized void hflush() throws IOException {
    // hflush hsyn same
    flushOrSync();
  }

  /**
   * Flush local file or multipart to obs. focus: not posix bucket is not
   * support
   *
   * @throws IOException io exception
   */
  private synchronized void flushOrSync() throws IOException {

    checkOpen();
    if (hasException.get()) {
      String flushWarning = String.format(
          "flushOrSync has error. bs : pre write obs[%s] has error.",
          key);
      LOG.warn(flushWarning);
      throw new IOException(flushWarning);
    }
    if (fs.isFsBucket()) {
      // upload
      flushCurrentBlock();

      // clear
      clearHFlushOrSync();
    } else {
      LOG.warn("not posix bucket, not support hflush or hsync.");
      flush();
    }
  }

  /**
   * Clear for hflush or hsync.
   */
  private synchronized void clearHFlushOrSync() {
    appendAble.set(true);
    multiPartUpload = null;
  }

  /**
   * Upload block to obs.
   *
   * @param block    block
   * @param hasBlock jungle if has block
   * @throws IOException io exception
   */
  private synchronized void uploadWriteBlocks(
      final OBSDataBlocks.DataBlock block,
      final boolean hasBlock)
      throws IOException {
    if (multiPartUpload == null) {
      if (hasBlock) {
        // no uploads of data have taken place, put the single block
        // up. This must happen even if there is no data, so that 0 byte
        // files are created.
        putObjectIfNeedAppend();
      }
    } else {
      // there has already been at least one block scheduled for upload;
      // put up the current then wait
      if (hasBlock && block.hasData()) {
        // send last part
        uploadCurrentBlock();
      }
      // wait for the partial uploads to finish
      final List<PartEtag> partETags
          = multiPartUpload.waitForAllPartUploads();
      // then complete the operation
      multiPartUpload.complete(partETags);
    }
    LOG.debug("Upload complete for {}", writeOperationHelper.toString(key));
  }

  private synchronized void completeCurrentBlock() throws IOException {
    OBSDataBlocks.DataBlock block = getActiveBlock();
    boolean hasBlock = hasActiveBlock();
    LOG.debug("{}: complete block #{}: current block= {}", this, blockCount,
        hasBlock ? block : "(none)");
    try {
      uploadWriteBlocks(block, hasBlock);
    } catch (IOException ioe) {
      LOG.error("Upload data to obs error. io exception : {}",
          ioe.getMessage());
      throw ioe;
    } catch (Exception e) {
      LOG.error("Upload data to obs error. other exception : {}",
          e.getMessage());
      throw e;
    } finally {
      OBSCommonUtils.closeAll(block);
      clearActiveBlock();
    }
  }

  private synchronized void flushCurrentBlock() throws IOException {
    OBSDataBlocks.DataBlock block = getActiveBlock();
    boolean hasBlock = hasActiveBlock();
    LOG.debug(
        "{}: complete block #{}: current block= {}", this, blockCount,
        hasBlock ? block : "(none)");
    try {
      uploadWriteBlocks(block, hasBlock);
    } catch (IOException ioe) {
      LOG.error("hflush data to obs error. io exception : {}",
          ioe.getMessage());
      hasException.set(true);
      throw ioe;
    } catch (Exception e) {
      LOG.error("hflush data to obs error. other exception : {}",
          e.getMessage());
      hasException.set(true);
      throw e;
    } finally {
      OBSCommonUtils.closeAll(block);
      clearActiveBlock();
    }
  }

  @Override
  public synchronized void hsync() throws IOException {
    flushOrSync();
  }

  /**
   * Multiple partition upload.
   */
  private class MultiPartUpload {
    /**
     * Upload id for multipart upload.
     */
    private final String uploadId;

    /**
     * List for async part upload future.
     */
    private final List<ListenableFuture<PartEtag>> partETagsFutures;

    MultiPartUpload() throws IOException {
      this.uploadId = writeOperationHelper.initiateMultiPartUpload(key);
      this.partETagsFutures = new ArrayList<>(2);
      LOG.debug(
          "Initiated multi-part upload for {} with , the key is {}"
              + "id '{}'",
          writeOperationHelper,
          uploadId,
          key);
    }

    /**
     * Upload a block of data asynchronously.
     *
     * @param block block to upload
     * @throws IOException upload failure
     */
    private void uploadBlockAsync(final OBSDataBlocks.DataBlock block)
        throws IOException {
      LOG.debug("Queueing upload of {}", block);

      final int size = block.dataSize();
      final int currentPartNumber = partETagsFutures.size() + 1;
      final UploadPartRequest request;
      if (block instanceof OBSDataBlocks.DiskBlock) {
        request = writeOperationHelper.newUploadPartRequest(
            key,
            uploadId,
            currentPartNumber,
            size,
            (File) block.startUpload());
      } else {
        request = writeOperationHelper.newUploadPartRequest(
            key,
            uploadId,
            currentPartNumber,
            size,
            (InputStream) block.startUpload());

      }
      ListenableFuture<PartEtag> partETagFuture = executorService.submit(
          () -> {
            // this is the queued upload operation
            LOG.debug("Uploading part {} for id '{}'",
                currentPartNumber, uploadId);
            // do the upload
            PartEtag partETag = null;
            try {
              if (mockUploadPartError) {
                throw new ObsException("mock upload part error");
              }
              UploadPartResult uploadPartResult
                  = OBSCommonUtils.uploadPart(fs, request);
              partETag =
                  new PartEtag(uploadPartResult.getEtag(),
                      uploadPartResult.getPartNumber());
              if (LOG.isDebugEnabled()) {
                LOG.debug("Completed upload of {} to part {}",
                    block, partETag);
              }
            } catch (ObsException e) {
              // catch all exception
              hasException.set(true);
              LOG.error("UploadPart failed (ObsException). {}",
                  OBSCommonUtils.translateException("UploadPart", key,
                      e).getMessage());
            } finally {
              // close the stream and block
              OBSCommonUtils.closeAll(block);
            }
            return partETag;
          });
      partETagsFutures.add(partETagFuture);
    }

    /**
     * Block awaiting all outstanding uploads to complete.
     *
     * @return list of results
     * @throws IOException IO Problems
     */
    private List<PartEtag> waitForAllPartUploads() throws IOException {
      LOG.debug("Waiting for {} uploads to complete",
          partETagsFutures.size());
      try {
        return Futures.allAsList(partETagsFutures).get();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted partUpload", ie);
        LOG.debug("Cancelling futures");
        for (ListenableFuture<PartEtag> future : partETagsFutures) {
          future.cancel(true);
        }
        // abort multipartupload
        this.abort();
        throw new IOException(
            "Interrupted multi-part upload with id '" + uploadId
                + "' to " + key);
      } catch (ExecutionException ee) {
        // there is no way of recovering so abort
        // cancel all partUploads
        LOG.debug("While waiting for upload completion", ee);
        LOG.debug("Cancelling futures");
        for (ListenableFuture<PartEtag> future : partETagsFutures) {
          future.cancel(true);
        }
        // abort multipartupload
        this.abort();
        throw OBSCommonUtils.extractException(
            "Multi-part upload with id '" + uploadId + "' to " + key,
            key, ee);
      }
    }

    /**
     * This completes a multipart upload. Sometimes it fails; here retries are
     * handled to avoid losing all data on a transient failure.
     *
     * @param partETags list of partial uploads
     * @return result for completing multipart upload
     * @throws IOException on any problem
     */
    private CompleteMultipartUploadResult complete(
        final List<PartEtag> partETags) throws IOException {
      String operation = String.format(
          "Completing multi-part upload for key '%s',"
              + " id '%s' with %s partitions ",
          key, uploadId, partETags.size());
      try {
        LOG.debug(operation);
        return writeOperationHelper.completeMultipartUpload(key,
            uploadId, partETags);
      } catch (ObsException e) {
        throw OBSCommonUtils.translateException(operation, key, e);
      }
    }

    /**
     * Abort a multi-part upload. Retries are attempted on failures.
     * IOExceptions are caught; this is expected to be run as a cleanup
     * process.
     */
    void abort() {
      String operation =
          String.format(
              "Aborting multi-part upload for '%s', id '%s",
              writeOperationHelper, uploadId);
      try {
        LOG.debug(operation);
        writeOperationHelper.abortMultipartUpload(key, uploadId);
      } catch (ObsException e) {
        LOG.warn(
            "Unable to abort multipart upload, you may need to purge  "
                + "uploaded parts",
            e);
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BasicSessionCredential 源码

hadoop DefaultOBSClientFactory 源码

hadoop FileConflictException 源码

hadoop OBS 源码

hadoop OBSClientFactory 源码

hadoop OBSCommonUtils 源码

hadoop OBSConstants 源码

hadoop OBSDataBlocks 源码

hadoop OBSFileStatus 源码

hadoop OBSFileSystem 源码

0  赞