hadoop RemoteSASKeyGeneratorImpl 源码

  • 2022-10-20
  • 浏览 (211)

haddop RemoteSASKeyGeneratorImpl 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.security.UserGroupInformation;

import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;

/**
 * Class implementing a RemoteSASKeyGenerator. This class
 * uses the url passed in via the Configuration to make a
 * rest call to generate the required SAS Key.
 */
public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {

  public static final Logger LOG =
      LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
  private static final ObjectReader RESPONSE_READER = new ObjectMapper()
      .readerFor(RemoteSASKeyGenerationResponse.class);

  /**
   * Configuration parameter name expected in the Configuration
   * object to provide the url of the remote service {@value}
   */
  public static final String KEY_CRED_SERVICE_URLS =
      "fs.azure.cred.service.urls";
  /**
   * Configuration key to enable http retry policy for SAS Key generation. {@value}
   */
  public static final String
      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
      "fs.azure.saskeygenerator.http.retry.policy.enabled";
  /**
   * Configuration key for SAS Key Generation http retry policy spec. {@value}
   */
  public static final String
      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
      "fs.azure.saskeygenerator.http.retry.policy.spec";
  /**
   * Container SAS Key generation OP name. {@value}
   */
  private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
  /**
   * Relative Blob SAS Key generation OP name. {@value}
   */
  private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
  /**
   * Query parameter specifying the expiry period to be used for sas key
   * {@value}
   */
  private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
  /**
   * Query parameter name for the storage account. {@value}
   */
  private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
      "storage_account";
  /**
   * Query parameter name for the storage account container. {@value}
   */
  private static final String CONTAINER_QUERY_PARAM_NAME = "container";
  /**
   * Query parameter name for the relative path inside the storage
   * account container. {@value}
   */
  private static final String RELATIVE_PATH_QUERY_PARAM_NAME = "relative_path";
  /**
   * SAS Key Generation Remote http client retry policy spec. {@value}
   */
  private static final String
      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
      "10,3,100,2";
  /**
   * Saskey caching period
   */
  private static final String SASKEY_CACHEENTRY_EXPIRY_PERIOD =
      "fs.azure.saskey.cacheentry.expiry.period";

  private WasbRemoteCallHelper remoteCallHelper = null;
  private boolean isKerberosSupportEnabled;
  private boolean isSpnegoTokenCacheEnabled;
  private RetryPolicy retryPolicy;
  private String[] commaSeparatedUrls;
  private CachingAuthorizer<CachedSASKeyEntry, URI> cache;

  private static final int HOURS_IN_DAY = 24;
  private static final int MINUTES_IN_HOUR = 60;

  public RemoteSASKeyGeneratorImpl(Configuration conf) {
    super(conf);
  }

  public void initialize(Configuration conf) throws IOException {

    LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");

    this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);

    this.isKerberosSupportEnabled =
        conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
    this.isSpnegoTokenCacheEnabled =
        conf.getBoolean(Constants.AZURE_ENABLE_SPNEGO_TOKEN_CACHE, true);
    this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
    if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
      throw new IOException(
          KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
    }
    if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
      this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false,
          isSpnegoTokenCacheEnabled);
    } else {
      this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
    }

    /* Expire the cache entry five minutes before the actual saskey expiry, so that we never encounter a case
     * where a stale sas-key-entry is picked up from the cache; which is expired on use.
     */
    long sasKeyExpiryPeriodInMinutes = getSasKeyExpiryPeriod() * HOURS_IN_DAY * MINUTES_IN_HOUR; // sas-expiry is in days, convert into mins
    long cacheEntryDurationInMinutes =
        conf.getTimeDuration(SASKEY_CACHEENTRY_EXPIRY_PERIOD, sasKeyExpiryPeriodInMinutes, TimeUnit.MINUTES);
    cacheEntryDurationInMinutes = (cacheEntryDurationInMinutes > (sasKeyExpiryPeriodInMinutes - 5))
        ? (sasKeyExpiryPeriodInMinutes - 5)
        : cacheEntryDurationInMinutes;
    this.cache = new CachingAuthorizer<>(cacheEntryDurationInMinutes, "SASKEY");
    this.cache.init(conf);
    LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
  }

  @Override
  public URI getContainerSASUri(String storageAccount,
      String container) throws SASKeyGenerationException {
    RemoteSASKeyGenerationResponse sasKeyResponse = null;
    try {
      CachedSASKeyEntry cacheKey = new CachedSASKeyEntry(storageAccount, container, "/");
      URI cacheResult = cache.get(cacheKey);
      if (cacheResult != null) {
        return cacheResult;
      }

      LOG.debug("Generating Container SAS Key: Storage Account {}, Container {}", storageAccount, container);
      URIBuilder uriBuilder = new URIBuilder();
      uriBuilder.setPath("/" + CONTAINER_SAS_OP);
      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
          "" + getSasKeyExpiryPeriod());

      sasKeyResponse = makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
              uriBuilder.getQueryParams());

      if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
        URI sasKey = new URI(sasKeyResponse.getSasKey());
        cache.put(cacheKey, sasKey);
        return sasKey;
      } else {
        throw new SASKeyGenerationException(
            "Remote Service encountered error in SAS Key generation : "
                + sasKeyResponse.getResponseMessage());
      }
    } catch (URISyntaxException uriSyntaxEx) {
      throw new SASKeyGenerationException("Encountered URISyntaxException"
          + " while building the HttpGetRequest to remote service for ",
          uriSyntaxEx);
    }
  }

  @Override
  public URI getRelativeBlobSASUri(String storageAccount,
      String container, String relativePath) throws SASKeyGenerationException {

    try {
      CachedSASKeyEntry cacheKey = new CachedSASKeyEntry(storageAccount, container, relativePath);
      URI cacheResult = cache.get(cacheKey);
      if (cacheResult != null) {
        return cacheResult;
      }

      LOG.debug("Generating RelativePath SAS Key for relativePath {} inside Container {} inside Storage Account {}",
          relativePath, container, storageAccount);

      URIBuilder uriBuilder = new URIBuilder();
      uriBuilder.setPath("/" + BLOB_SAS_OP);
      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
      uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME, relativePath);
      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
          "" + getSasKeyExpiryPeriod());

      RemoteSASKeyGenerationResponse sasKeyResponse =
          makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
              uriBuilder.getQueryParams());
      if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
        URI sasKey = new URI(sasKeyResponse.getSasKey());
        cache.put(cacheKey, sasKey);
        return sasKey;
      } else {
        throw new SASKeyGenerationException(
            "Remote Service encountered error in SAS Key generation : "
                + sasKeyResponse.getResponseMessage());
      }
    } catch (URISyntaxException uriSyntaxEx) {
      throw new SASKeyGenerationException("Encountered URISyntaxException"
          + " while building the HttpGetRequest to " + " remote service",
          uriSyntaxEx);
    }
  }

  /**
   * Helper method to make a remote request.
   *
   * @param urls        - Urls to use for the remote request
   * @param path        - hadoop.auth token for the remote request
   * @param queryParams - queryParams to be used.
   * @return RemoteSASKeyGenerationResponse
   */
  private RemoteSASKeyGenerationResponse makeRemoteRequest(String[] urls,
      String path, List<NameValuePair> queryParams)
      throws SASKeyGenerationException {

    try {
      String responseBody = remoteCallHelper
          .makeRemoteRequest(urls, path, queryParams, HttpGet.METHOD_NAME);
      return RESPONSE_READER.readValue(responseBody);

    } catch (WasbRemoteCallException remoteCallEx) {
      throw new SASKeyGenerationException("Encountered RemoteCallException"
          + " while retrieving SAS key from remote service", remoteCallEx);
    } catch (JsonParseException jsonParserEx) {
      throw new SASKeyGenerationException("Encountered JsonParseException "
          + "while parsing the response from remote"
          + " service into RemoteSASKeyGenerationResponse object",
          jsonParserEx);
    } catch (JsonMappingException jsonMappingEx) {
      throw new SASKeyGenerationException("Encountered JsonMappingException"
          + " while mapping the response from remote service into "
          + "RemoteSASKeyGenerationResponse object", jsonMappingEx);
    } catch (IOException ioEx) {
      throw new SASKeyGenerationException("Encountered IOException while "
          + "accessing remote service to retrieve SAS Key", ioEx);
    }
  }
}

/**
 * POJO representing the response expected from a Remote
 * SAS Key generation service.
 * The remote SAS Key generation service is expected to
 * return SAS key in json format:
 * {
 *   "responseCode" : 0 or non-zero <int>,
 *   "responseMessage" : relavant message on failure <String>,
 *   "sasKey" : Requested SAS Key <String>
 * }
 */
class RemoteSASKeyGenerationResponse {

  /**
   * Response code for the call.
   */
  private int responseCode;

  /**
   * An intelligent message corresponding to
   * result. Specifically in case of failure
   * the reason for failure.
   */
  private String responseMessage;

  /**
   * SAS Key corresponding to the request.
   */
  private String sasKey;

  public int getResponseCode() {
    return responseCode;
  }

  public void setResponseCode(int responseCode) {
    this.responseCode = responseCode;
  }

  public String getResponseMessage() {
    return responseMessage;
  }

  public void setResponseMessage(String responseMessage) {
    this.responseMessage = responseMessage;
  }

  public String getSasKey() {
    return sasKey;
  }

  public void setSasKey(String sasKey) {
    this.sasKey = sasKey;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AzureException 源码

hadoop AzureFileSystemThreadPoolExecutor 源码

hadoop AzureFileSystemThreadTask 源码

hadoop AzureLinkedStack 源码

hadoop AzureNativeFileSystemStore 源码

hadoop BlobMaterialization 源码

hadoop BlobOperationDescriptor 源码

hadoop BlockBlobAppendStream 源码

hadoop BlockBlobInputStream 源码

hadoop CachingAuthorizer 源码

0  赞