hadoop S3ADelegationTokens 源码

  • 2022-10-20
  • 浏览 (122)

haddop S3ADelegationTokens 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/S3ADelegationTokens.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.auth.delegation;

import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.DelegationTokenStatistics;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.DurationInfo;

import static org.apache.hadoop.fs.s3a.Statistic.DELEGATION_TOKENS_ISSUED;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkState;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DEFAULT_DELEGATION_TOKEN_BINDING;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DURATION_LOG_AT_INFO;

/**
 * Support for creating a DT from a filesystem.
 *
 * Isolated from S3A for control and testability.
 *
 * The S3A Delegation Tokens are special in that the tokens are not directly
 * used to authenticate with the AWS services.
 * Instead they can session/role  credentials requested off AWS on demand.
 *
 * The design is extensible in that different back-end bindings can be used
 * to switch to different session creation mechanisms, or indeed, to any
 * other authentication mechanism supported by an S3 service, provided it
 * ultimately accepts some form of AWS credentials for authentication through
 * the AWS SDK. That is, if someone wants to wire this up to Kerberos, or
 * OAuth2, this design should support them.
 *
 * URIs processed must be the canonical URIs for the service.
 */
@InterfaceAudience.Private
public class S3ADelegationTokens extends AbstractDTService {

  private static final Logger LOG = LoggerFactory.getLogger(
      S3ADelegationTokens.class);

  @VisibleForTesting
  static final String E_ALREADY_DEPLOYED
      = "S3A Delegation tokens has already been bound/deployed";

  public static final String E_DELEGATION_TOKENS_DISABLED
      = "Delegation tokens are not enabled";

  /**
   * User who owns this FS; fixed at instantiation time, so that
   * in calls to getDelegationToken() and similar, this user is the one whose
   * credentials are involved.
   */
  private final UserGroupInformation user;

  /**
   * Count of number of created tokens.
   * For testing and diagnostics.
   */
  private final AtomicInteger creationCount = new AtomicInteger(0);

  /**
   * Text value of this token service.
   */
  private Text service;

  /**
   * Active Delegation token.
   */
  private Optional<Token<AbstractS3ATokenIdentifier>> boundDT
      = Optional.empty();

  /**
   * The DT decoded when this instance is created by bonding
   * to an existing DT.
   */
  private Optional<AbstractS3ATokenIdentifier> decodedIdentifier
      = Optional.empty();

  /**
   * Dynamically loaded token binding; lifecycle matches this object.
   */
  private AbstractDelegationTokenBinding tokenBinding;

  /**
   * List of cred providers; unset until {@link #bindToDelegationToken(Token)}.
   */
  private Optional<AWSCredentialProviderList> credentialProviders
      = Optional.empty();

  /**
   * The access policies we want for operations.
   * There's no attempt to ask for "admin" permissions here.
   */
  protected static final EnumSet<AWSPolicyProvider.AccessLevel> ACCESS_POLICY
      = EnumSet.of(
          AWSPolicyProvider.AccessLevel.READ,
          AWSPolicyProvider.AccessLevel.WRITE);

  /**
   * Statistics for the operations.
   */
  private DelegationTokenStatistics stats;

  /**
   * Name of the token binding as extracted from token kind; used for
   * logging.
   */
  private String tokenBindingName = "";

  /**
   * Instantiate.
   * @throws IOException if login fails.
   */
  public S3ADelegationTokens() throws IOException {
    super("S3ADelegationTokens");
    user = UserGroupInformation.getCurrentUser();
  }

  @Override
  public void bindToFileSystem(final URI uri,
      final StoreContext context,
      final DelegationOperations delegationOperations)
      throws IOException {
    super.bindToFileSystem(uri, context, delegationOperations);
    service = getTokenService(getCanonicalUri());
    stats = context.getInstrumentation().newDelegationTokenStatistics();
  }

  /**
   * Init the service.
   * This identifies the token binding class to use and creates, initializes
   * and starts it.
   * Will raise an exception if delegation tokens are not enabled.
   * @param conf configuration
   * @throws Exception any failure to start up
   */
  @Override
  protected void serviceInit(final Configuration conf) throws Exception {
    super.serviceInit(conf);
    checkState(hasDelegationTokenBinding(conf),
        E_DELEGATION_TOKENS_DISABLED);
    Class<? extends AbstractDelegationTokenBinding> binding = conf.getClass(
        DelegationConstants.DELEGATION_TOKEN_BINDING,
        SessionTokenBinding.class,
        AbstractDelegationTokenBinding.class);
    tokenBinding = binding.newInstance();
    tokenBinding.bindToFileSystem(getCanonicalUri(),
        getStoreContext(),
        getPolicyProvider());
    tokenBinding.init(conf);
    tokenBindingName = tokenBinding.getKind().toString();
    LOG.debug("Filesystem {} is using delegation tokens of kind {}",
        getCanonicalUri(), tokenBindingName);
  }

  /**
   * Service startup includes binding to any delegation token, and
   * deploying unbounded if there is none.
   * It is after this that token operations can be used.
   * @throws Exception any failure
   */
  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    tokenBinding.start();
    bindToAnyDelegationToken();
    LOG.debug("S3A Delegation support token {} with {}",
        identifierToString(),
        tokenBinding.getDescription());
  }

  /**
   * Get the identifier as a string, or "(none)".
   * @return a string value for logs etc.
   */
  private String identifierToString() {
    return decodedIdentifier.map(Objects::toString)
        .orElse("(none)");
  }

  /**
   * Stop the token binding.
   * @throws Exception on any failure
   */
  @SuppressWarnings("ThrowableNotThrown")
  @Override
  protected void serviceStop() throws Exception {
    LOG.debug("Stopping delegation tokens");
    try {
      super.serviceStop();
    } finally {
      ServiceOperations.stopQuietly(LOG, tokenBinding);
    }
  }


  /**
   * Perform the unbonded deployment operations.
   * Create the AWS credential provider chain to use
   * when talking to AWS when there is no delegation token to work with.
   * authenticating this client with AWS services, and saves it
   * to {@link #credentialProviders}
   *
   * @throws IOException any failure.
   */
  private void deployUnbonded()
      throws IOException {
    requireServiceStarted();
    checkState(!isBoundToDT(),
        "Already Bound to a delegation token");
    LOG.debug("No delegation tokens present: using direct authentication");
    credentialProviders = Optional.of(tokenBinding.deployUnbonded());
  }

  /**
   * Attempt to bind to any existing DT, including unmarshalling its contents
   * and creating the AWS credential provider used to authenticate
   * the client.
   *
   * If successful:
   * <ol>
   *   <li>{@link #boundDT} is set to the retrieved token.</li>
   *   <li>{@link #decodedIdentifier} is set to the extracted identifier.</li>
   *   <li>{@link #credentialProviders} is set to the credential
   *   provider(s) returned by the token binding.</li>
   * </ol>
   * If unsuccessful, {@link #deployUnbonded()} is called for the
   * unbonded codepath instead, which will set
   * {@link #credentialProviders} to its value.
   *
   * This means after this call (and only after) the token operations
   * can be invoked.
   *
   * This method is called from {@link #serviceStart()}, so a check on
   * the service state can be used to check things; the state model
   * prevents re-entrant calls.
   * @throws IOException selection/extraction/validation failure.
   */
  private void bindToAnyDelegationToken() throws IOException {
    checkState(!credentialProviders.isPresent(), E_ALREADY_DEPLOYED);
    Token<AbstractS3ATokenIdentifier> token = selectTokenFromFSOwner();
    if (token != null) {
      bindToDelegationToken(token);
    } else {
      deployUnbonded();
    }
    if (credentialProviders.get().size() == 0) {
      throw new DelegationTokenIOException("No AWS credential providers"
          + " created by Delegation Token Binding "
          + tokenBinding.getName());
    }
  }

  /**
   * This is a test-only back door which resets the state and binds to
   * a token again.
   * This allows an instance of this class to be bonded to a DT after being
   * started, so avoids the need to have the token in the current user
   * credentials. It is package scoped so as to only be usable in tests
   * in the same package.
   *
   * Yes, this is ugly, but there is no obvious/easy way to test token
   * binding without Kerberos getting involved.
   * @param token token to decode and bind to.
   * @throws IOException selection/extraction/validation failure.
   */
  @VisibleForTesting
  void resetTokenBindingToDT(final Token<AbstractS3ATokenIdentifier> token)
      throws IOException{
    credentialProviders = Optional.empty();
    bindToDelegationToken(token);
  }

  /**
   * Bind to a delegation token retrieved for this filesystem.
   * Extract the secrets from the token and set internal fields
   * to the values.
   * <ol>
   *   <li>{@link #boundDT} is set to {@code token}.</li>
   *   <li>{@link #decodedIdentifier} is set to the extracted identifier.</li>
   *   <li>{@link #credentialProviders} is set to the credential
   *   provider(s) returned by the token binding.</li>
   * </ol>
   * @param token token to decode and bind to.
   * @throws IOException selection/extraction/validation failure.
   */
  @VisibleForTesting
  public void bindToDelegationToken(
      final Token<AbstractS3ATokenIdentifier> token)
      throws IOException {
    checkState(!credentialProviders.isPresent(), E_ALREADY_DEPLOYED);
    boundDT = Optional.of(token);
    AbstractS3ATokenIdentifier dti = extractIdentifier(token);
    LOG.info("Using delegation token {}", dti);
    decodedIdentifier = Optional.of(dti);
    try (DurationInfo ignored = new DurationInfo(LOG, DURATION_LOG_AT_INFO,
        "Creating Delegation Token")) {
      // extract the credential providers.
      credentialProviders = Optional.of(
          tokenBinding.bindToTokenIdentifier(dti));
    }
  }

  /**
   * Predicate: is there a bound DT?
   * @return true if there's a value in {@link #boundDT}.
   */
  public boolean isBoundToDT() {
    return boundDT.isPresent();
  }

  /**
   * Get any bound DT.
   * @return a delegation token if this instance was bound to it.
   */
  public Optional<Token<AbstractS3ATokenIdentifier>> getBoundDT() {
    return boundDT;
  }

  /**
   * Predicate: will this binding issue a DT if requested
   * in a call to {@link #getBoundOrNewDT(EncryptionSecrets, Text)}?
   * That is: should the filesystem declare that it is issuing
   * delegation tokens?
   * @return a declaration of what will happen when asked for a token.
   */
  public TokenIssuingPolicy getTokenIssuingPolicy() {
    return isBoundToDT()
        ? TokenIssuingPolicy.ReturnExistingToken
        : tokenBinding.getTokenIssuingPolicy();
  }

  /**
   * Get any bound DT or create a new one.
   * @return a delegation token.
   * @throws IOException if one cannot be created
   * @param encryptionSecrets encryption secrets for any new token.
   * @param renewer the token renewer.
   */
  @SuppressWarnings("OptionalGetWithoutIsPresent")
  public Token<AbstractS3ATokenIdentifier> getBoundOrNewDT(
      final EncryptionSecrets encryptionSecrets,
      final Text renewer)
      throws IOException {
    LOG.debug("Delegation token requested");
    if (isBoundToDT()) {
      // the FS was created on startup with a token, so return it.
      LOG.debug("Returning current token");
      return getBoundDT().get();
    } else {
      // not bound to a token, so create a new one.
      // issued DTs are not cached so that long-lived filesystems can
      // reliably issue session/role tokens.
      return createDelegationToken(encryptionSecrets, renewer);
    }
  }

  /**
   * How many delegation tokens have been issued?
   * @return the number times {@link #createDelegationToken(EncryptionSecrets, Text)}
   * returned a token.
   */
  public int getCreationCount() {
    return creationCount.get();
  }

  /**
   * Create a delegation token for the user.
   * This will only be called if a new DT is needed, that is: the
   * filesystem has been deployed unbonded.
   * @param encryptionSecrets encryption secrets for the token.
   * @param renewer the token renewer
   * @return the token
   * @throws IOException if one cannot be created
   */
  @VisibleForTesting
  public Token<AbstractS3ATokenIdentifier> createDelegationToken(
      final EncryptionSecrets encryptionSecrets,
      final Text renewer) throws IOException {
    requireServiceStarted();
    checkArgument(encryptionSecrets != null,
        "Null encryption secrets");
    List<RoleModel.Statement> statements = getPolicyProvider()
        .listAWSPolicyRules(ACCESS_POLICY);
    Optional<RoleModel.Policy> rolePolicy =
        statements.isEmpty() ?
            Optional.empty() : Optional.of(new RoleModel.Policy(statements));

    try(DurationInfo ignored = new DurationInfo(LOG, DURATION_LOG_AT_INFO,
        "Creating New Delegation Token", tokenBinding.getKind())) {
      Token<AbstractS3ATokenIdentifier> token = trackDuration(stats,
          DELEGATION_TOKENS_ISSUED.getSymbol(), () ->
              tokenBinding.createDelegationToken(rolePolicy,
                  encryptionSecrets, renewer));
      if (token != null) {
        token.setService(service);
        noteTokenCreated(token);
      }
      return token;
    }
  }

  /**
   * Note that a token has been created; increment counters and statistics.
   * @param token token created
   */
  private void noteTokenCreated(final Token<AbstractS3ATokenIdentifier> token) {
    LOG.info("Created S3A Delegation Token: {}", token);
    creationCount.incrementAndGet();
    stats.tokenIssued();
  }

  /**
   * Get a null/possibly empty list of extra delegation token issuers.
   * These will be asked for tokens when
   * {@link DelegationTokenIssuer#getAdditionalTokenIssuers()} recursively
   * collects all DTs a filesystem can offer.
   * @return a null or empty array. Default implementation: null
   * @throws IOException failure
   */
  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
      throws IOException {
    return null;
  }

  /**
   * Get the AWS credential provider.
   * @return the DT credential provider
   * @throws IOException failure to parse the DT
   * @throws IllegalStateException if this instance is not bound to a DT
   */
  public AWSCredentialProviderList getCredentialProviders()
      throws IOException {
    return credentialProviders.orElseThrow(
        () -> new DelegationTokenIOException("Not yet bonded"));
  }

  /**
   * Get the encryption secrets of the DT.
   * non-empty iff service is started and was bound to a DT.
   * @return any encryption settings propagated with the DT.
   */
  public Optional<EncryptionSecrets> getEncryptionSecrets() {
    return decodedIdentifier.map(
        AbstractS3ATokenIdentifier::getEncryptionSecrets);
  }

  /**
   * Get any decoded identifier from the bound DT; empty if not bound.
   * @return the decoded identifier.
   */
  public Optional<AbstractS3ATokenIdentifier> getDecodedIdentifier() {
    return decodedIdentifier;
  }

  /**
   * Get the service identifier of the owning FS.
   * @return a service identifier to use when registering tokens
   */
  public Text getService() {
    return service;
  }

  /**
   * The canonical name of the service.
   * This can be used as the canonical service name for the FS.
   * @return the canonicalized FS URI.
   */
  public String getCanonicalServiceName() {
    return getCanonicalUri().toString();
  }

  /**
   * Find a token for the FS user and canonical filesystem URI.
   * @return the token, or null if one cannot be found.
   * @throws IOException on a failure to unmarshall the token.
   */
  @VisibleForTesting
  public Token<AbstractS3ATokenIdentifier> selectTokenFromFSOwner()
      throws IOException {
    return lookupToken(user.getCredentials(),
        service,
        tokenBinding.getKind());
  }

  /**
   * Get the service identifier of a filesystem.
   * This must be unique for (S3A, the FS URI)
   * @param fsURI filesystem URI
   * @return identifier to use.
   */
  private static Text getTokenService(final URI fsURI) {
    return getTokenService(fsURI.toString());
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "S3ADelegationTokens{");
    sb.append("canonicalServiceURI=").append(getCanonicalUri());
    sb.append("; owner=").append(user.getShortUserName());
    sb.append("; isBoundToDT=").append(isBoundToDT());
    sb.append("; token creation count=").append(getCreationCount());
    sb.append("; tokenManager=").append(tokenBinding);
    sb.append("; token=").append(identifierToString());
    sb.append('}');
    return sb.toString();
  }

  /**
   * Get the kind of the issued tokens.
   * @return token kind.
   */
  public Text getTokenKind() {
    return tokenBinding.getKind();
  }

  /**
   * Get the service identifier of a filesystem URI.
   * This must be unique for (S3a, the FS URI)
   * @param fsURI filesystem URI as a string
   * @return identifier to use.
   */
  @VisibleForTesting
  static Text getTokenService(final String fsURI) {
    return new Text(fsURI);
  }

  /**
   * From a token, get the session token identifier.
   * @param token token to process
   * @return the session token identifier
   * @throws IOException failure to validate/read data encoded in identifier.
   * @throws IllegalArgumentException if the token isn't an S3A session token
   */
  public AbstractS3ATokenIdentifier extractIdentifier(
      final Token<? extends AbstractS3ATokenIdentifier> token)
      throws IOException {

    checkArgument(token != null, "null token");
    AbstractS3ATokenIdentifier identifier;
    // harden up decode beyond that Token does itself
    try {
      identifier = token.decodeIdentifier();
    } catch (RuntimeException e) {
      Throwable cause = e.getCause();
      if (cause != null) {
        // its a wrapping around class instantiation.
        throw new DelegationTokenIOException("Decoding S3A token " + cause,
            cause);
      } else {
        throw e;
      }
    }
    if (identifier == null) {
      throw new DelegationTokenIOException("Failed to unmarshall token for "
          + getCanonicalUri());
    }
    identifier.validate();
    return identifier;
  }

  /**
   * Return a string for use in building up the User-Agent field, so
   * get into the S3 access logs. Useful for diagnostics.
   * Delegates to {{@link AbstractDelegationTokenBinding#getUserAgentField()}}
   * for the current binding.
   * @return a string for the S3 logs or "" for "nothing to add"
   */
  public String getUserAgentField() {
    return tokenBinding.getUserAgentField();
  }

  /**
   * Look up a token from the credentials, verify it is of the correct
   * kind.
   * @param credentials credentials to look up.
   * @param service service name
   * @param kind token kind to look for
   * @return the token or null if no suitable token was found
   * @throws DelegationTokenIOException wrong token kind found
   */
  @VisibleForTesting
  public static Token<AbstractS3ATokenIdentifier> lookupToken(
      final Credentials credentials,
      final Text service,
      final Text kind)
      throws DelegationTokenIOException {

    LOG.debug("Looking for token for service {} in credentials", service);
    Token<?> token = credentials.getToken(service);
    if (token != null) {
      Text tokenKind = token.getKind();
      LOG.debug("Found token of kind {}", tokenKind);
      if (kind.equals(tokenKind)) {
        // the Oauth implementation catches and logs here; this one
        // throws the failure up.
        return (Token<AbstractS3ATokenIdentifier>) token;
      } else {

        // there's a token for this URI, but its not the right DT kind
        throw new DelegationTokenIOException(
            DelegationTokenIOException.TOKEN_MISMATCH + ": expected token"
            + " for " + service
            + " of type " + kind
            + " but got a token of type " + tokenKind);
      }
    }
    // A token for the service was not found
    LOG.debug("No token for {} found", service);
    return null;
  }

  /**
   * Look up any token from the service; cast it to one of ours.
   * @param credentials credentials
   * @param service service to look up
   * @return any token found or null if none was
   * @throws ClassCastException if the token is of a wrong type.
   */
  public static Token<AbstractS3ATokenIdentifier> lookupToken(
      final Credentials credentials,
      final Text service) {
    return (Token<AbstractS3ATokenIdentifier>) credentials.getToken(service);
  }

  /**
   * Look for any S3A token for the given FS service.
   * @param credentials credentials to scan.
   * @param uri the URI of the FS to look for
   * @return the token or null if none was found
   */
  public static Token<AbstractS3ATokenIdentifier> lookupS3ADelegationToken(
      final Credentials credentials,
      final URI uri) {
    return lookupToken(credentials, getTokenService(uri.toString()));
  }

  /**
   * Predicate: does this configuration enable delegation tokens?
   * That is: is there any text in the option
   * {@link DelegationConstants#DELEGATION_TOKEN_BINDING} ?
   * @param conf configuration to examine
   * @return true iff the trimmed configuration option is not empty.
   */
  public static boolean hasDelegationTokenBinding(Configuration conf) {
    return StringUtils.isNotEmpty(
        conf.getTrimmed(DELEGATION_TOKEN_BINDING,
            DEFAULT_DELEGATION_TOKEN_BINDING));
  }

  /**
   * How will tokens be issued on request?
   *
   * The {@link #RequestNewToken} policy does not guarantee that a tokens
   * can be created, only that an attempt will be made to request one.
   * It may fail (wrong credential types, wrong role, etc).
   */
  public enum TokenIssuingPolicy {

    /** The existing token will be returned. */
    ReturnExistingToken,

    /** No tokens will be issued. */
    NoTokensAvailable,

    /** An attempt will be made to request a new DT. */
    RequestNewToken
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AWSPolicyProvider 源码

hadoop AbstractDTService 源码

hadoop AbstractDelegationTokenBinding 源码

hadoop AbstractS3ATokenIdentifier 源码

hadoop DelegationConstants 源码

hadoop DelegationOperations 源码

hadoop DelegationTokenIOException 源码

hadoop DelegationTokenProvider 源码

hadoop EncryptionSecretOperations 源码

hadoop EncryptionSecrets 源码

0  赞