hadoop SecureStorageInterfaceImpl 源码

  • 2022-10-20
  • 浏览 (197)

haddop SecureStorageInterfaceImpl 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import com.microsoft.azure.storage.blob.CopyState;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.PageRange;
import com.microsoft.azure.storage.blob.BlockEntry;

import org.apache.hadoop.classification.InterfaceAudience;

/***
 * An implementation of the StorageInterface for SAS Key mode.
 *
 */

public class SecureStorageInterfaceImpl extends StorageInterface {

  public static final Logger LOG = LoggerFactory.getLogger(
      SecureStorageInterfaceImpl.class);
  public static final String SAS_ERROR_CODE = "SAS Error";
  private SASKeyGeneratorInterface sasKeyGenerator;
  private String storageAccount;
  private RetryPolicyFactory retryPolicy;
  private int timeoutIntervalInMs;
  private boolean useContainerSasKeyForAllAccess;

  /**
   * Configuration key to specify if containerSasKey should be used for all accesses
   */
  public static final String KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS =
      "fs.azure.saskey.usecontainersaskeyforallaccess";

  public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
      Configuration conf) throws SecureModeException {

    if (useLocalSASKeyMode) {
      LOG.debug("Authenticating with SecureStorage and local SAS key");
      this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
    } else {
      LOG.debug("Authenticating with SecureStorage and remote SAS key generation");
      RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
          new RemoteSASKeyGeneratorImpl(conf);
      try {
        remoteSasKeyGenerator.initialize(conf);
      } catch (IOException ioe) {
        throw new SecureModeException("Remote SAS Key mode could"
            + " not be initialized", ioe);
      }
      this.sasKeyGenerator = remoteSasKeyGenerator;
    }
    this.useContainerSasKeyForAllAccess = conf.getBoolean(KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS, true);
    LOG.debug("Container SAS key {} be used for all access",
        useContainerSasKeyForAllAccess ? "will" : "will not");
  }

  @Override
  public void setTimeoutInMs(int timeoutInMs) {
    timeoutIntervalInMs = timeoutInMs;
  }

  @Override
  public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
    retryPolicy = retryPolicyFactory;
  }

  @Override
  public void createBlobClient(CloudStorageAccount account) {
    String errorMsg = "createBlobClient is an invalid operation in"
        + " SAS Key Mode";
    LOG.error(errorMsg);
    throw new UnsupportedOperationException(errorMsg);
  }

  @Override
  public void createBlobClient(URI baseUri) {
    String errorMsg = "createBlobClient is an invalid operation in "
        + "SAS Key Mode";
    LOG.error(errorMsg);
    throw new UnsupportedOperationException(errorMsg);
  }

  @Override
  public void createBlobClient(URI baseUri, StorageCredentials credentials) {
    String errorMsg = "createBlobClient is an invalid operation in SAS "
        + "Key Mode";
    LOG.error(errorMsg);
    throw new UnsupportedOperationException(errorMsg);
  }

  @Override
  public StorageCredentials getCredentials() {
    String errorMsg = "getCredentials is an invalid operation in SAS "
        + "Key Mode";
    LOG.error(errorMsg);
    throw new UnsupportedOperationException(errorMsg);
  }

  @Override
  public CloudBlobContainerWrapper getContainerReference(String name)
      throws URISyntaxException, StorageException {

    try {
      CloudBlobContainer container = new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
          storageAccount, name));
      if (retryPolicy != null) {
        container.getServiceClient().getDefaultRequestOptions().setRetryPolicyFactory(retryPolicy);
      }
      if (timeoutIntervalInMs > 0) {
        container.getServiceClient().getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs);
      }
      return (useContainerSasKeyForAllAccess)
          ? new SASCloudBlobContainerWrapperImpl(storageAccount, container, null)
          : new SASCloudBlobContainerWrapperImpl(storageAccount, container, sasKeyGenerator);
    } catch (SASKeyGenerationException sasEx) {
      String errorMsg = "Encountered SASKeyGeneration exception while "
          + "generating SAS Key for container : " + name
          + " inside Storage account : " + storageAccount;
      LOG.error(errorMsg);
      throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
    }
  }

  public void setStorageAccountName(String storageAccount) {
    this.storageAccount = storageAccount;
  }

  @InterfaceAudience.Private
  static class SASCloudBlobContainerWrapperImpl
    extends CloudBlobContainerWrapper {

    private final CloudBlobContainer container;
    private String storageAccount;
    private SASKeyGeneratorInterface sasKeyGenerator;

    public SASCloudBlobContainerWrapperImpl(String storageAccount,
        CloudBlobContainer container, SASKeyGeneratorInterface sasKeyGenerator) {
      this.storageAccount = storageAccount;
      this.container = container;
      this.sasKeyGenerator = sasKeyGenerator;
    }

    @Override
    public String getName() {
      return container.getName();
    }

    @Override
    public boolean exists(OperationContext opContext) throws StorageException {
      return container.exists(AccessCondition.generateEmptyCondition(), null,
          opContext);
    }

    @Override
    public void create(OperationContext opContext) throws StorageException {
      container.create(null, opContext);
    }

    @Override
    public HashMap<String, String> getMetadata() {
      return container.getMetadata();
    }

    @Override
    public void setMetadata(HashMap<String, String> metadata) {
      container.setMetadata(metadata);
    }

    @Override
    public void downloadAttributes(OperationContext opContext)
        throws StorageException {
      container.downloadAttributes(AccessCondition.generateEmptyCondition(),
          null, opContext);
    }

    @Override
    public void uploadMetadata(OperationContext opContext)
        throws StorageException {
      container.uploadMetadata(AccessCondition.generateEmptyCondition(), null,
          opContext);
    }

    @Override
    public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
        throws URISyntaxException, StorageException {

      CloudBlobDirectory dir = container.getDirectoryReference(relativePath);
      return new SASCloudBlobDirectoryWrapperImpl(dir);
    }

    @Override
    public CloudBlobWrapper getBlockBlobReference(String relativePath)
        throws URISyntaxException, StorageException {
      try {
        CloudBlockBlob blob = (sasKeyGenerator!=null)
          ? new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(storageAccount, getName(), relativePath))
          : container.getBlockBlobReference(relativePath);
        blob.getServiceClient().setDefaultRequestOptions(
                container.getServiceClient().getDefaultRequestOptions());
        return new SASCloudBlockBlobWrapperImpl(blob);
      } catch (SASKeyGenerationException sasEx) {
        String errorMsg = "Encountered SASKeyGeneration exception while "
            + "generating SAS Key for relativePath : " + relativePath
            + " inside container : " + getName()  + " Storage account : " + storageAccount;
        LOG.error(errorMsg);
        throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
      }
    }

    @Override
    public CloudBlobWrapper getPageBlobReference(String relativePath)
        throws URISyntaxException, StorageException {
      try {
        CloudPageBlob blob   = (sasKeyGenerator!=null)
          ? new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(storageAccount, getName(), relativePath))
          : container.getPageBlobReference(relativePath);

        blob.getServiceClient().setDefaultRequestOptions(
                container.getServiceClient().getDefaultRequestOptions());
        return new SASCloudPageBlobWrapperImpl(blob);
      } catch (SASKeyGenerationException sasEx) {
        String errorMsg = "Encountered SASKeyGeneration exception while "
            + "generating SAS Key for relativePath : " + relativePath
            + " inside container : " + getName()
            + " Storage account : " + storageAccount;
        LOG.error(errorMsg);
        throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
      }
    }
  }

  //
  // WrappingIterator
  //

  /**
   * This iterator wraps every ListBlobItem as they come from the listBlobs()
   * calls to their proper wrapping objects.
   */
  private static class SASWrappingIterator implements Iterator<ListBlobItem> {
    private final Iterator<ListBlobItem> present;

    public SASWrappingIterator(Iterator<ListBlobItem> present) {
      this.present = present;
    }

    public static Iterable<ListBlobItem> wrap(
        final Iterable<ListBlobItem> present) {
      return new Iterable<ListBlobItem>() {
        @Override
        public Iterator<ListBlobItem> iterator() {
          return new SASWrappingIterator(present.iterator());
        }
      };
    }

    @Override
    public boolean hasNext() {
      return present.hasNext();
    }

    @Override
    public ListBlobItem next() {
      ListBlobItem unwrapped = present.next();
      if (unwrapped instanceof CloudBlobDirectory) {
        return new SASCloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
      } else if (unwrapped instanceof CloudBlockBlob) {
        return new SASCloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
      } else if (unwrapped instanceof CloudPageBlob) {
        return new SASCloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
      } else {
        return unwrapped;
      }
    }

    @Override
    public void remove() {
      present.remove();
    }
  }

  //
  // CloudBlobDirectoryWrapperImpl
  //
  @InterfaceAudience.Private
  static class SASCloudBlobDirectoryWrapperImpl extends CloudBlobDirectoryWrapper {
    private final CloudBlobDirectory directory;

    public SASCloudBlobDirectoryWrapperImpl(CloudBlobDirectory directory) {
      this.directory = directory;
    }

    @Override
    public URI getUri() {
      return directory.getUri();
    }

    @Override
    public Iterable<ListBlobItem> listBlobs(String prefix,
        boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
        BlobRequestOptions options, OperationContext opContext)
        throws URISyntaxException, StorageException {
      return SASWrappingIterator.wrap(directory.listBlobs(prefix,
          useFlatBlobListing, listingDetails, options, opContext));
    }

    @Override
    public CloudBlobContainer getContainer() throws URISyntaxException,
        StorageException {
      return directory.getContainer();
    }

    @Override
    public CloudBlobDirectory getParent() throws URISyntaxException,
        StorageException {
      return directory.getParent();
    }

    @Override
    public StorageUri getStorageUri() {
      return directory.getStorageUri();
    }
  }

  abstract static class SASCloudBlobWrapperImpl implements CloudBlobWrapper {
    private final CloudBlob blob;
    @Override
    public CloudBlob getBlob() {
      return blob;
    }

    public URI getUri() {
      return getBlob().getUri();
    }

    protected SASCloudBlobWrapperImpl(CloudBlob blob) {
      this.blob = blob;
    }

    @Override
    public HashMap<String, String> getMetadata() {
      return getBlob().getMetadata();
    }

    @Override
    public void delete(OperationContext opContext, SelfRenewingLease lease)
        throws StorageException {
      getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
          null, opContext);
    }

    /**
     * Return and access condition for this lease, or else null if
     * there's no lease.
     */
    private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
      AccessCondition leaseCondition = null;
      if (lease != null) {
        leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
      }
      return leaseCondition;
    }

    @Override
    public boolean exists(OperationContext opContext)
        throws StorageException {
      return getBlob().exists(null, null, opContext);
    }

    @Override
    public void downloadAttributes(
        OperationContext opContext) throws StorageException {
      getBlob().downloadAttributes(null, null, opContext);
    }

    @Override
    public BlobProperties getProperties() {
      return getBlob().getProperties();
    }

    @Override
    public void setMetadata(HashMap<String, String> metadata) {
      getBlob().setMetadata(metadata);
    }

    @Override
    public InputStream openInputStream(
        BlobRequestOptions options,
        OperationContext opContext) throws StorageException {
      return getBlob().openInputStream(null, options, opContext);
    }

    public OutputStream openOutputStream(
        BlobRequestOptions options,
        OperationContext opContext) throws StorageException {
      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
    }

    public void upload(InputStream sourceStream, OperationContext opContext)
        throws StorageException, IOException {
      getBlob().upload(sourceStream, 0, null, null, opContext);
    }

    @Override
    public CloudBlobContainer getContainer() throws URISyntaxException,
        StorageException {
      return getBlob().getContainer();
    }

    @Override
    public CloudBlobDirectory getParent() throws URISyntaxException,
        StorageException {
      return getBlob().getParent();
    }

    @Override
    public void uploadMetadata(OperationContext opContext)
        throws StorageException {
      uploadMetadata(null, null, opContext);
    }

    @Override
    public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
        OperationContext opContext) throws StorageException{
      getBlob().uploadMetadata(accessConditions, options, opContext);
    }

    public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
        throws StorageException {

      // Include lease in request if lease not null.
      getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
    }

    @Override
    public int getStreamMinimumReadSizeInBytes() {
        return getBlob().getStreamMinimumReadSizeInBytes();
    }

    @Override
    public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
      getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
    }

    @Override
    public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
      getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
    }

    @Override
    public StorageUri getStorageUri() {
      return getBlob().getStorageUri();
    }

    @Override
    public CopyState getCopyState() {
      return getBlob().getCopyState();
    }

    @Override
    public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
        OperationContext opContext, boolean overwriteDestination)
            throws StorageException, URISyntaxException {
      AccessCondition dstAccessCondition =
          overwriteDestination
              ? null
              : AccessCondition.generateIfNotExistsCondition();
      getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
          null, dstAccessCondition, options, opContext);
    }

    @Override
    public void downloadRange(long offset, long length, OutputStream outStream,
        BlobRequestOptions options, OperationContext opContext)
            throws StorageException, IOException {

      getBlob().downloadRange(offset, length, outStream, null, options, opContext);
    }

    @Override
    public SelfRenewingLease acquireLease() throws StorageException {
      return new SelfRenewingLease(this, false);
    }
  }

  //
  // CloudBlockBlobWrapperImpl
  //

  static class SASCloudBlockBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudBlockBlobWrapper {

    public SASCloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
      super(blob);
    }

    public OutputStream openOutputStream(
        BlobRequestOptions options,
        OperationContext opContext) throws StorageException {
      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
    }

    public void upload(InputStream sourceStream, OperationContext opContext)
        throws StorageException, IOException {
      getBlob().upload(sourceStream, 0, null, null, opContext);
    }

    public void uploadProperties(OperationContext opContext)
        throws StorageException {
      getBlob().uploadProperties(null, null, opContext);
    }

    @Override
    public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
        OperationContext opContext) throws IOException, StorageException {
      return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);

    }

    @Override
    public void uploadBlock(String blockId, AccessCondition accessCondition,
        InputStream sourceStream,
        long length, BlobRequestOptions options,
        OperationContext opContext) throws IOException, StorageException {
      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length,
          accessCondition, options, opContext);
    }

    @Override
    public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
        OperationContext opContext) throws IOException, StorageException {
      ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
    }
  }

  static class SASCloudPageBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudPageBlobWrapper {
    public SASCloudPageBlobWrapperImpl(CloudPageBlob blob) {
      super(blob);
    }

    public void create(final long length, BlobRequestOptions options,
        OperationContext opContext) throws StorageException {
      ((CloudPageBlob) getBlob()).create(length, null, options, opContext);
    }

    public void uploadPages(final InputStream sourceStream, final long offset,
        final long length, BlobRequestOptions options, OperationContext opContext)
        throws StorageException, IOException {
      ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
          options, opContext);
    }

    public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
        OperationContext opContext) throws StorageException {
      return ((CloudPageBlob) getBlob()).downloadPageRanges(
          null, options, opContext);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AzureException 源码

hadoop AzureFileSystemThreadPoolExecutor 源码

hadoop AzureFileSystemThreadTask 源码

hadoop AzureLinkedStack 源码

hadoop AzureNativeFileSystemStore 源码

hadoop BlobMaterialization 源码

hadoop BlobOperationDescriptor 源码

hadoop BlockBlobAppendStream 源码

hadoop BlockBlobInputStream 源码

hadoop CachingAuthorizer 源码

0  赞