hadoop DelegationTokenRenewer 源码

  • 2022-10-20
  • 浏览 (255)

haddop DelegationTokenRenewer 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.security;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
 * Service to renew application delegation tokens.
 */
@Private
@Unstable
public class DelegationTokenRenewer extends AbstractService {
  
  private static final Logger LOG =
      LoggerFactory.getLogger(DelegationTokenRenewer.class);
  @VisibleForTesting
  public static final Text HDFS_DELEGATION_KIND =
      new Text("HDFS_DELEGATION_TOKEN");
  public static final String SCHEME = "hdfs";

  private volatile int lastEventQueueSizeLogged = 0;

  // global single timer (daemon)
  private Timer renewalTimer;
  private RMContext rmContext;
  
  // delegation token canceler thread
  private DelegationTokenCancelThread dtCancelThread =
    new DelegationTokenCancelThread();
  private ThreadPoolExecutor renewerService;

  private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
      new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();

  private ConcurrentMap<Token<?>, DelegationTokenToRenew> allTokens =
      new ConcurrentHashMap<Token<?>, DelegationTokenToRenew>();

  private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
      new ConcurrentHashMap<ApplicationId, Long>();

  private long tokenRemovalDelayMs;
  
  private Thread delayedRemovalThread;
  private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
  private volatile boolean isServiceStarted;
  private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
  
  private boolean alwaysCancelDelegationTokens;
  private boolean tokenKeepAliveEnabled;
  private boolean hasProxyUserPrivileges;
  private long credentialsValidTimeRemaining;
  private long tokenRenewerThreadTimeout;
  private long tokenRenewerThreadRetryInterval;
  private int tokenRenewerThreadRetryMaxAttempts;
  private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
      new ConcurrentHashMap<>();
  private boolean delegationTokenRenewerPoolTrackerFlag = true;

  // this config is supposedly not used by end-users.
  public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
      YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
  public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
      10800000; // 3h

  public DelegationTokenRenewer() {
    super(DelegationTokenRenewer.class.getName());
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    this.alwaysCancelDelegationTokens =
        conf.getBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
            YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL);
    this.hasProxyUserPrivileges =
        conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
          YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
    this.tokenKeepAliveEnabled =
        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
    this.tokenRemovalDelayMs =
        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
            YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
    this.credentialsValidTimeRemaining =
        conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
          DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
    tokenRenewerThreadTimeout =
        conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
            TimeUnit.MILLISECONDS);
    tokenRenewerThreadRetryInterval = conf.getTimeDuration(
        YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
        YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
        TimeUnit.MILLISECONDS);
    tokenRenewerThreadRetryMaxAttempts =
        conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
            YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
    setLocalSecretManagerAndServiceAddr();
    renewerService = createNewThreadPoolService(conf);
    pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
    renewalTimer = new Timer(true);
    super.serviceInit(conf);
  }

  protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
    int nThreads = conf.getInt(
        YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
        YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);

    ThreadFactory tf = new ThreadFactoryBuilder()
        .setNameFormat("DelegationTokenRenewer #%d")
        .build();
    ThreadPoolExecutor pool =
        new ThreadPoolExecutor(nThreads, nThreads, 3L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    pool.setThreadFactory(tf);
    pool.allowCoreThreadTimeOut(true);
    return pool;
  }

  // enable RM to short-circuit token operations directly to itself
  private void setLocalSecretManagerAndServiceAddr() {
    RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext
      .getRMDelegationTokenSecretManager(), rmContext.getClientRMService()
      .getBindAddress());
  }

  @Override
  protected void serviceStart() throws Exception {
    dtCancelThread.start();
    if (tokenKeepAliveEnabled) {
      delayedRemovalThread =
          new Thread(new DelayedTokenRemovalRunnable(getConfig()),
              "DelayedTokenCanceller");
      delayedRemovalThread.start();
    }

    setLocalSecretManagerAndServiceAddr();
    serviceStateLock.writeLock().lock();
    isServiceStarted = true;
    serviceStateLock.writeLock().unlock();

    if (delegationTokenRenewerPoolTrackerFlag) {
      renewerService.submit(new DelegationTokenRenewerPoolTracker());
    }

    while(!pendingEventQueue.isEmpty()) {
      processDelegationTokenRenewerEvent(pendingEventQueue.take());
    }
    super.serviceStart();
  }

  private void processDelegationTokenRenewerEvent(
      DelegationTokenRenewerEvent evt) {
    serviceStateLock.readLock().lock();
    try {
      if (isServiceStarted) {
        Future<?> future =
            renewerService.submit(new DelegationTokenRenewerRunnable(evt));
        futures.put(evt, future);
      } else {
        pendingEventQueue.add(evt);
        int qSize = pendingEventQueue.size();
        if (qSize != 0 && qSize % 1000 == 0
            && lastEventQueueSizeLogged != qSize) {
          lastEventQueueSizeLogged = qSize;
          LOG.info("Size of pending " +
              "DelegationTokenRenewerEvent queue is " + qSize);
        }
      }
    } finally {
      serviceStateLock.readLock().unlock();
    }
  }

  @Override
  protected void serviceStop() {
    if (renewalTimer != null) {
      renewalTimer.cancel();
    }
    appTokens.clear();
    allTokens.clear();

    serviceStateLock.writeLock().lock();
    try {
      isServiceStarted = false;
      this.renewerService.shutdown();
    } finally {
      serviceStateLock.writeLock().unlock();
    }

    dtCancelThread.interrupt();
    try {
      dtCancelThread.join(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
      delayedRemovalThread.interrupt();
      try {
        delayedRemovalThread.join(1000);
      } catch (InterruptedException e) {
        LOG.info("Interrupted while joining on delayed removal thread.", e);
      }
    }
  }

  /**
   * class that is used for keeping tracks of DT to renew
   *
   */
  @VisibleForTesting
  protected class DelegationTokenToRenew {
    public final Token<?> token;
    public final Collection<ApplicationId> referringAppIds;
    public final Configuration conf;
    public long expirationDate;
    public RenewalTimerTask timerTask;
    public volatile boolean shouldCancelAtEnd;
    public long maxDate;
    public String user;

    public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
        Token<?> token,
        Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
        String user) {
      this.token = token;
      this.user = user;
      if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
        try {
          AbstractDelegationTokenIdentifier identifier =
              (AbstractDelegationTokenIdentifier) token.decodeIdentifier();
          maxDate = identifier.getMaxDate();
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
      this.referringAppIds = Collections.synchronizedSet(
          new HashSet<ApplicationId>(applicationIds));
      this.conf = conf;
      this.expirationDate = expirationDate;
      this.timerTask = null;
      this.shouldCancelAtEnd = shouldCancelAtEnd | alwaysCancelDelegationTokens;
    }
    
    public void setTimerTask(RenewalTimerTask tTask) {
      timerTask = tTask;
    }

    @VisibleForTesting
    public void cancelTimer() {
      if (timerTask != null) {
        timerTask.cancel();
      }
    }

    @VisibleForTesting
    public boolean isTimerCancelled() {
      return (timerTask != null) && timerTask.cancelled.get();
    }

    @Override
    public String toString() {
      return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
    }
    
    @Override
    public boolean equals(Object obj) {
      return obj instanceof DelegationTokenToRenew &&
        token.equals(((DelegationTokenToRenew)obj).token);
    }
    
    @Override
    public int hashCode() {
      return token.hashCode();
    }
  }
  
  
  private static class DelegationTokenCancelThread extends Thread {
    private static class TokenWithConf {
      Token<?> token;
      Configuration conf;
      TokenWithConf(Token<?> token, Configuration conf) {
        this.token = token;
        this.conf = conf;
      }
    }
    private LinkedBlockingQueue<TokenWithConf> queue =  
      new LinkedBlockingQueue<TokenWithConf>();
     
    public DelegationTokenCancelThread() {
      super("Delegation Token Canceler");
      setDaemon(true);
    }
    public void cancelToken(Token<?> token,  
        Configuration conf) {
      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
      while (!queue.offer(tokenWithConf)) {
        LOG.warn("Unable to add token " + token + " for cancellation. " +
        		 "Will retry..");
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
    }

    public void run() {
      TokenWithConf tokenWithConf = null;
      while (true) {
        try {
          tokenWithConf = queue.take();
          final TokenWithConf current = tokenWithConf;
          if (LOG.isDebugEnabled()) {
            LOG.debug("Cancelling token " + tokenWithConf.token.getService());
          }
          // need to use doAs so that http can find the kerberos tgt
          UserGroupInformation.getLoginUser()
            .doAs(new PrivilegedExceptionAction<Void>(){

              @Override
              public Void run() throws Exception {
                current.token.cancel(current.conf);
                return null;
              }
            });
        } catch (IOException e) {
          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
              StringUtils.stringifyException(e));
        } catch (RuntimeException e) {
          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
              StringUtils.stringifyException(e));
        } catch (InterruptedException ie) {
          return;
        } catch (Throwable t) {
          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
                   ". Exiting..");
          System.exit(-1);
        }
      }
    }
  }

  @VisibleForTesting
  public Set<Token<?>> getDelegationTokens() {
    Set<Token<?>> tokens = new HashSet<Token<?>>();
    for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) {
      for (DelegationTokenToRenew token : tokenList) {
        tokens.add(token.token);
      }
    }
    return tokens;
  }

  /**
   * Asynchronously add application tokens for renewal.
   * @param applicationId added application
   * @param ts tokens
   * @param shouldCancelAtEnd true if tokens should be canceled when the app is
   * done else false.
   * @param user user
   * @param tokenConf tokenConf sent by the app-submitter
   */
  public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
      boolean shouldCancelAtEnd, String user, Configuration tokenConf) {
    processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
      applicationId, ts, shouldCancelAtEnd, user, tokenConf));
  }

  /**
   * Asynchronously add application tokens for renewal.
   *  @param applicationId
   *          added application
   * @param ts
   *          tokens
   * @param shouldCancelAtEnd
   *          true if tokens should be canceled when the app is done else false.
   * @param user user
   * @param tokenConf tokenConf sent by the app-submitter
   */
  public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
      Credentials ts, boolean shouldCancelAtEnd, String user,
      Configuration tokenConf) {
    processDelegationTokenRenewerEvent(
        new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
            shouldCancelAtEnd, user, tokenConf));
  }


  // Only for testing
  // Synchronously renew delegation tokens.
  public void addApplicationSync(ApplicationId applicationId, Credentials ts,
      boolean shouldCancelAtEnd, String user) throws IOException,
      InterruptedException {
    handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
      applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
  }

  private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
      throws IOException, InterruptedException {
    ApplicationId applicationId = evt.getApplicationId();
    Credentials ts = evt.getCredentials();
    boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
    if (ts == null) {
      return; // nothing to add
    }

    LOG.debug("Registering tokens for renewal for: appId = {}", applicationId);

    Collection<Token<?>> tokens = ts.getAllTokens();
    long now = System.currentTimeMillis();

    // find tokens for renewal, but don't add timers until we know
    // all renewable tokens are valid
    // At RM restart it is safe to assume that all the previously added tokens
    // are valid
    appTokens.put(applicationId,
      Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
    Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
    boolean hasHdfsToken = false;
    for (Token<?> token : tokens) {
      if (token.isManaged()) {
        if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          LOG.info(applicationId + " found existing hdfs token " + token);
          hasHdfsToken = true;
        }
        if (skipTokenRenewal(token)) {
          continue;
        }

        DelegationTokenToRenew dttr = allTokens.get(token);
        if (dttr == null) {
          Configuration tokenConf;
          if (evt.tokenConf != null) {
            // Override conf with app provided conf - this is required in cases
            // where RM does not have the required conf to communicate with
            // remote hdfs cluster. The conf is provided by the application
            // itself.
            tokenConf = evt.tokenConf;
            LOG.info("Using app provided token conf for renewal,"
                + " number of configs = " + tokenConf.size());
            if (LOG.isDebugEnabled()) {
              for (Iterator<Map.Entry<String, String>> itor =
                   tokenConf.iterator(); itor.hasNext(); ) {
                Map.Entry<String, String> entry = itor.next();
                LOG.debug("Token conf key is {} and value is {}",
                    entry.getKey(), entry.getValue());
              }
            }
          }  else {
            tokenConf = getConfig();
          }
          dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
              tokenConf, now, shouldCancelAtEnd, evt.getUser());
          try {
            renewToken(dttr);
          } catch (IOException ioe) {
            if (ioe instanceof SecretManager.InvalidToken
                && dttr.maxDate < Time.now()
                && evt instanceof DelegationTokenRenewerAppRecoverEvent
                && token.getKind().equals(HDFS_DELEGATION_KIND)) {
              LOG.info("Failed to renew hdfs token " + dttr
                  + " on recovery as it expired, requesting new hdfs token for "
                  + applicationId + ", user=" + evt.getUser(), ioe);
              requestNewHdfsDelegationTokenAsProxyUser(
                  Arrays.asList(applicationId), evt.getUser(),
                  evt.shouldCancelAtEnd());
              continue;
            }
            throw new IOException("Failed to renew token: " + dttr.token, ioe);
          }
        }
        tokenList.add(dttr);
      }
    }

    if (!tokenList.isEmpty()) {
      // Renewing token and adding it to timer calls are separated purposefully
      // If user provides incorrect token then it should not be added for
      // renewal.
      for (DelegationTokenToRenew dtr : tokenList) {
        DelegationTokenToRenew currentDtr =
            allTokens.putIfAbsent(dtr.token, dtr);
        if (currentDtr != null) {
          // another job beat us
          currentDtr.referringAppIds.add(applicationId);
          appTokens.get(applicationId).add(currentDtr);
        } else {
          appTokens.get(applicationId).add(dtr);
          setTimerForTokenRenewal(dtr);
        }
      }
    }

    if (!hasHdfsToken) {
      requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
          evt.getUser(),
        shouldCancelAtEnd);
    }
  }

  /**
   * Task - to renew a token
   *
   */
  private class RenewalTimerTask extends TimerTask {
    private DelegationTokenToRenew dttr;
    private AtomicBoolean cancelled = new AtomicBoolean(false);
    
    RenewalTimerTask(DelegationTokenToRenew t) {  
      dttr = t;  
    }
    
    @Override
    public void run() {
      if (cancelled.get()) {
        return;
      }

      Token<?> token = dttr.token;

      try {
        requestNewHdfsDelegationTokenIfNeeded(dttr);
        // if the token is not replaced by a new token, renew the token
        if (!dttr.isTimerCancelled()) {
          renewToken(dttr);
          setTimerForTokenRenewal(dttr);// set the next one
        } else {
          LOG.info("The token was removed already. Token = [" +dttr +"]");
        }
      } catch (Exception e) {
        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
        removeFailedDelegationToken(dttr);
      }
    }

    @Override
    public boolean cancel() {
      cancelled.set(true);
      return super.cancel();
    }
  }

  /*
   * Skip renewing token if the renewer of the token is set to ""
   * Caller is expected to have examined that token.isManaged() returns
   * true before calling this method.
   */
  private boolean skipTokenRenewal(Token<?> token)
      throws IOException {

    @SuppressWarnings("unchecked")
    AbstractDelegationTokenIdentifier identifier =
        ((Token<AbstractDelegationTokenIdentifier>) token).decodeIdentifier();
    if (identifier == null) {
      return false;
    }
    Text renewer = identifier.getRenewer();
    return (renewer != null && renewer.toString().equals(""));
  }

  /**
   * set task to renew the token
   */
  @VisibleForTesting
  protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
      throws IOException {
    // calculate timer time
    long expiresIn = token.expirationDate - System.currentTimeMillis();
    if (expiresIn <= 0) {
      LOG.info("Will not renew token " + token);
      return;
    }
    long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
    // need to create new task every time
    RenewalTimerTask tTask = new RenewalTimerTask(token);
    token.setTimerTask(tTask); // keep reference to the timer

    renewalTimer.schedule(token.timerTask, new Date(renewIn));
    LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
        + token.referringAppIds);
  }

  // renew a token
  @VisibleForTesting
  protected void renewToken(final DelegationTokenToRenew dttr)
      throws IOException {
    // need to use doAs so that http can find the kerberos tgt
    // NOTE: token renewers should be responsible for the correct UGI!
    try {
      dttr.expirationDate =
          UserGroupInformation.getLoginUser().doAs(
            new PrivilegedExceptionAction<Long>() {
              @Override
              public Long run() throws Exception {
                return dttr.token.renew(dttr.conf);
              }
            });
    } catch (InterruptedException e) {
      throw new IOException(e);
    }
    LOG.info("Renewed delegation-token= [" + dttr + "]");
  }

  // Request new hdfs token if the token is about to expire, and remove the old
  // token from the tokenToRenew list
  void requestNewHdfsDelegationTokenIfNeeded(
      final DelegationTokenToRenew dttr) throws IOException,
      InterruptedException {

    if (hasProxyUserPrivileges
        && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
        && dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {

      final Collection<ApplicationId> applicationIds;
      synchronized (dttr.referringAppIds) {
        applicationIds = new HashSet<>(dttr.referringAppIds);
        dttr.referringAppIds.clear();
      }
      // remove all old expiring hdfs tokens for this application.
      for (ApplicationId appId : applicationIds) {
        Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
        if (tokenSet == null || tokenSet.isEmpty()) {
          continue;
        }
        Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
        synchronized (tokenSet) {
          while (iter.hasNext()) {
            DelegationTokenToRenew t = iter.next();
            if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {
              iter.remove();
              allTokens.remove(t.token);
              t.cancelTimer();
              LOG.info("Removed expiring token " + t);
            }
          }
        }
      }
      LOG.info("Token= (" + dttr + ") is expiring, request new token.");
      requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user,
          dttr.shouldCancelAtEnd);
    }
  }

  private void requestNewHdfsDelegationTokenAsProxyUser(
      Collection<ApplicationId> referringAppIds,
      String user, boolean shouldCancelAtEnd) throws IOException,
      InterruptedException {
    boolean incrTokenSequenceNo = false;
    if (!hasProxyUserPrivileges) {
      LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens.");
      return;
    }
    // Get new hdfs tokens for this user
    Credentials credentials = new Credentials();
    Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);

    // Add new tokens to the toRenew list.
    LOG.info("Received new tokens for " + referringAppIds + ". Received "
        + newTokens.length + " tokens.");
    if (newTokens.length > 0) {
      for (Token<?> token : newTokens) {
        if (token.isManaged()) {
          DelegationTokenToRenew tokenToRenew =
              new DelegationTokenToRenew(referringAppIds, token, getConfig(),
                Time.now(), shouldCancelAtEnd, user);
          // renew the token to get the next expiration date.
          renewToken(tokenToRenew);
          setTimerForTokenRenewal(tokenToRenew);
          for (ApplicationId applicationId : referringAppIds) {
            appTokens.get(applicationId).add(tokenToRenew);
          }
          LOG.info("Received new token " + token);
          incrTokenSequenceNo = true;
        }
      }
    }

    if(incrTokenSequenceNo) {
      this.rmContext.incrTokenSequenceNo();
    }

    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
    for (ApplicationId applicationId : referringAppIds) {
      SystemCredentialsForAppsProto systemCredentialsForAppsProto =
          YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId,
              byteBuffer);
      rmContext.getSystemCredentialsForApps().put(applicationId,
          systemCredentialsForAppsProto);
    }
  }

  @VisibleForTesting
  protected Token<?>[] obtainSystemTokensForUser(String user,
      final Credentials credentials) throws IOException, InterruptedException {
    // Get new hdfs tokens on behalf of this user
    UserGroupInformation proxyUser =
        UserGroupInformation.createProxyUser(user,
          UserGroupInformation.getLoginUser());
    Token<?>[] newTokens =
        proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
          @Override
          public Token<?>[] run() throws Exception {
            FileSystem fs = FileSystem.get(getConfig());
            try {
              return fs.addDelegationTokens(
                  UserGroupInformation.getLoginUser().getUserName(),
                  credentials);
            } finally {
              // Close the FileSystem created by the new proxy user,
              // So that we don't leave an entry in the FileSystem cache
              fs.close();
            }
          }
        });
    return newTokens;
  }

  // cancel a token
  private void cancelToken(DelegationTokenToRenew t) {
    if(t.shouldCancelAtEnd) {
      dtCancelThread.cancelToken(t.token, t.conf);
    } else {
      LOG.info("Did not cancel "+t);
    }
  }
  
  /**
   * removing failed DT
   */
  private void removeFailedDelegationToken(DelegationTokenToRenew t) {
    Collection<ApplicationId> applicationIds = t.referringAppIds;
    synchronized (applicationIds) {
      LOG.error("removing failed delegation token for appid=" + applicationIds
          + ";t=" + t.token.getService());
      for (ApplicationId applicationId : applicationIds) {
        appTokens.get(applicationId).remove(t);
      }
    }
    allTokens.remove(t.token);

    // cancel the timer
    t.cancelTimer();
  }

  /**
   * Removing delegation token for completed applications.
   * @param applicationId completed application
   */
  public void applicationFinished(ApplicationId applicationId) {
    processDelegationTokenRenewerEvent(new DelegationTokenRenewerEvent(
        applicationId,
        DelegationTokenRenewerEventType.FINISH_APPLICATION));
  }

  private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
    if (!tokenKeepAliveEnabled) {
      removeApplicationFromRenewal(evt.getApplicationId());
    } else {
      delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
          + tokenRemovalDelayMs);
    }
  }
  
  /**
   * Add a list of applications to the keep alive list. If an appId already
   * exists, update it's keep-alive time.
   * 
   * @param appIds
   *          the list of applicationIds to be kept alive.
   * 
   */
  public void updateKeepAliveApplications(List<ApplicationId> appIds) {
    if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
      for (ApplicationId appId : appIds) {
        delayedRemovalMap.put(appId, System.currentTimeMillis()
            + tokenRemovalDelayMs);
      }
    }
  }

  private void removeApplicationFromRenewal(ApplicationId applicationId) {
    rmContext.getSystemCredentialsForApps().remove(applicationId);
    Set<DelegationTokenToRenew> tokens = appTokens.remove(applicationId);

    if (tokens != null && !tokens.isEmpty()) {
      synchronized (tokens) {
        Iterator<DelegationTokenToRenew> it = tokens.iterator();
        while (it.hasNext()) {
          DelegationTokenToRenew dttr = it.next();
          if (LOG.isDebugEnabled()) {
            LOG.debug("Removing delegation token for appId=" + applicationId
                + "; token=" + dttr.token.getService());
          }

          // continue if the app list isn't empty
          synchronized(dttr.referringAppIds) {
            dttr.referringAppIds.remove(applicationId);
            if (!dttr.referringAppIds.isEmpty()) {
              continue;
            }
          }
          // cancel the timer
          dttr.cancelTimer();

          // cancel the token
          cancelToken(dttr);

          allTokens.remove(dttr.token);
        }
      }
    }
  }

  /**
   * Takes care of cancelling app delegation tokens after the configured
   * cancellation delay, taking into consideration keep-alive requests.
   * 
   */
  private class DelayedTokenRemovalRunnable implements Runnable {

    private long waitTimeMs;

    DelayedTokenRemovalRunnable(Configuration conf) {
      waitTimeMs =
          conf.getLong(
              YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
              YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
    }

    @Override
    public void run() {
      List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
      while (!Thread.currentThread().isInterrupted()) {
        Iterator<Entry<ApplicationId, Long>> it =
            delayedRemovalMap.entrySet().iterator();
        toCancel.clear();
        while (it.hasNext()) {
          Entry<ApplicationId, Long> e = it.next();
          if (e.getValue() < System.currentTimeMillis()) {
            toCancel.add(e.getKey());
          }
        }
        for (ApplicationId appId : toCancel) {
          removeApplicationFromRenewal(appId);
          delayedRemovalMap.remove(appId);
        }
        synchronized (this) {
          try {
            wait(waitTimeMs);
          } catch (InterruptedException e) {
            LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
            return;
          }
        }
      }
    }
  }
  
  public void setRMContext(RMContext rmContext) {
    this.rmContext = rmContext;
  }

  @VisibleForTesting
  public void setDelegationTokenRenewerPoolTracker(boolean flag) {
    delegationTokenRenewerPoolTrackerFlag = flag;
  }

  /**
   * Create a timer task to retry the token renewer event which would be
   * scheduled at defined intervals based on the configuration.
   *
   * @param evt
   * @return Timer Task
   */
  private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) {
    return new TimerTask() {
      @Override
      public void run() {
        LOG.info("Retrying token renewer thread for appid = {} and "
            + "attempt is {}", evt.getApplicationId(),
            evt.getAttempt());
        evt.incrAttempt();

        Collection<Token<?>> tokens =
            evt.getCredentials().getAllTokens();
        for (Token<?> token : tokens) {
          DelegationTokenToRenew dttr = allTokens.get(token);
          if (dttr != null) {
            removeFailedDelegationToken(dttr);
          }
        }

        DelegationTokenRenewerAppRecoverEvent event =
            new DelegationTokenRenewerAppRecoverEvent(
                evt.getApplicationId(), evt.getCredentials(),
                evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf());
        event.setAttempt(evt.getAttempt());
        processDelegationTokenRenewerEvent(event);
      }
    };
  }

  /**
   * Runnable class to set timeout for futures of all threads running in
   * renewerService thread pool executor asynchronously.
   *
   * In case of timeout exception, retries would be attempted with defined
   * intervals till no. of retry attempt reaches max attempt.
   */
  private final class DelegationTokenRenewerPoolTracker
      implements Runnable {

    DelegationTokenRenewerPoolTracker() {
    }

    /**
     * Keep traversing <Future> of renewer pool threads and wait for specific
     * timeout. In case of timeout exception, retry the event till no. of
     * attempts reaches max attempts with specific interval.
     */
    @Override
    public void run() {
      while (true) {
        for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
            .entrySet()) {
          DelegationTokenRenewerEvent evt = entry.getKey();
          Future<?> future = entry.getValue();
          try {
            future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
          } catch (TimeoutException e) {

            // Cancel thread and retry the same event in case of timeout
            if (future != null && !future.isDone() && !future.isCancelled()) {
              future.cancel(true);
              futures.remove(evt);
              if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
                renewalTimer.schedule(
                    getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
                    tokenRenewerThreadRetryInterval);
              } else {
                LOG.info(
                    "Exhausted max retry attempts {} in token renewer "
                        + "thread for {}",
                    tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
              }
            }
          } catch (Exception e) {
            LOG.info("Problem in submitting renew tasks in token renewer "
                + "thread.", e);
          }
        }
      }
    }
  }

  /*
   * This will run as a separate thread and will process individual events. It
   * is done in this way to make sure that the token renewal as a part of
   * application submission and token removal as a part of application finish
   * is asynchronous in nature.
   */
  private final class DelegationTokenRenewerRunnable
      implements Runnable {

    private DelegationTokenRenewerEvent evt;
    
    public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
      this.evt = evt;
    }
    
    @Override
    public void run() {
      if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
        DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
            (DelegationTokenRenewerAppSubmitEvent) evt;
        handleDTRenewerAppSubmitEvent(appSubmitEvt);
      } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
        DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
            (DelegationTokenRenewerAppRecoverEvent) evt;
        handleDTRenewerAppRecoverEvent(appRecoverEvt);
      } else if (evt.getType().equals(
          DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
        DelegationTokenRenewer.this.handleAppFinishEvent(evt);
      }
    }

    @SuppressWarnings("unchecked")
    private void handleDTRenewerAppSubmitEvent(
        DelegationTokenRenewerAppSubmitEvent event) {
      /*
       * For applications submitted with delegation tokens we are not submitting
       * the application to scheduler from RMAppManager. Instead we are doing
       * it from here. The primary goal is to make token renewal as a part of
       * application submission asynchronous so that client thread is not
       * blocked during app submission.
       */
      try {
        // Setup tokens for renewal
        DelegationTokenRenewer.this.handleAppSubmitEvent(event);
        rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START));
      } catch (Throwable t) {
        LOG.warn(
            "Unable to add the application to the delegation token renewer.",
            t);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we havne't yet informed the
        // Scheduler about the existence of the application
        rmContext.getDispatcher().getEventHandler().handle(
            new RMAppEvent(event.getApplicationId(),
                RMAppEventType.APP_REJECTED, t.getMessage()));
      }
    }
  }

  @SuppressWarnings("unchecked")
  private void handleDTRenewerAppRecoverEvent(
      DelegationTokenRenewerAppRecoverEvent event) {
    try {
      // Setup tokens for renewal during recovery
      DelegationTokenRenewer.this.handleAppSubmitEvent(event);
    } catch (Throwable t) {
      LOG.warn("Unable to add the application to the delegation token"
          + " renewer on recovery.", t);
    }
  }

  static class DelegationTokenRenewerAppSubmitEvent
      extends AbstractDelegationTokenRenewerAppEvent {
    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
        Credentials credentails, boolean shouldCancelAtEnd, String user,
        Configuration tokenConf) {
      super(appId, credentails, shouldCancelAtEnd, user,
          DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION, tokenConf);
    }
  }

  static class DelegationTokenRenewerAppRecoverEvent
      extends AbstractDelegationTokenRenewerAppEvent {
    public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
        Credentials credentails, boolean shouldCancelAtEnd, String user,
        Configuration tokenConf) {
      super(appId, credentails, shouldCancelAtEnd, user,
          DelegationTokenRenewerEventType.RECOVER_APPLICATION, tokenConf);
    }
  }

  static class AbstractDelegationTokenRenewerAppEvent extends
      DelegationTokenRenewerEvent {

    private Credentials credentials;
    private Configuration tokenConf;
    private boolean shouldCancelAtEnd;
    private String user;

    public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
        Credentials credentials, boolean shouldCancelAtEnd, String user,
        DelegationTokenRenewerEventType type, Configuration tokenConf) {
      super(appId, type);
      this.credentials = credentials;
      this.shouldCancelAtEnd = shouldCancelAtEnd;
      this.user = user;
      this.tokenConf = tokenConf;
    }

    public Credentials getCredentials() {
      return credentials;
    }

    public boolean shouldCancelAtEnd() {
      return shouldCancelAtEnd;
    }

    public String getUser() {
      return user;
    }

    private Configuration getTokenConf() {
      return tokenConf;
    }
  }
  
  enum DelegationTokenRenewerEventType {
    VERIFY_AND_START_APPLICATION,
    RECOVER_APPLICATION,
    FINISH_APPLICATION
  }
  
  private static class DelegationTokenRenewerEvent extends
      AbstractEvent<DelegationTokenRenewerEventType> {

    private ApplicationId appId;
    private int attempt = 1;

    public DelegationTokenRenewerEvent(ApplicationId appId,
        DelegationTokenRenewerEventType type) {
      super(type);
      this.appId = appId;
    }

    public ApplicationId getApplicationId() {
      return appId;
    }

    public void incrAttempt() {
      attempt++;
    }

    public int getAttempt() {
      return attempt;
    }

    public void setAttempt(int attempt) {
      this.attempt = attempt;
    }
  }

  // only for testing
  protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
    return allTokens;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMRMTokenSecretManager 源码

hadoop AppPriorityACLsManager 源码

hadoop CapacityQueueACLsManager 源码

hadoop CapacityReservationsACLsManager 源码

hadoop ClientToAMTokenSecretManagerInRM 源码

hadoop FairQueueACLsManager 源码

hadoop FairReservationsACLsManager 源码

hadoop GenericQueueACLsManager 源码

hadoop NMTokenSecretManagerInRM 源码

hadoop ProxyCAManager 源码

0  赞