hadoop DelegationTokenRenewer 源码

  • 2022-10-20
  • 浏览 (778)

haddop DelegationTokenRenewer 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs;

import org.apache.hadoop.classification.VisibleForTesting;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A daemon thread that waits for the next file system to renew.
 */
@InterfaceAudience.Private
public class DelegationTokenRenewer
    extends Thread {
  private static final Logger LOG = LoggerFactory
      .getLogger(DelegationTokenRenewer.class);

  /** The renewable interface used by the renewer. */
  public interface Renewable {
    /** @return the renew token. */
    public Token<?> getRenewToken();

    /**
     * Set delegation token.
     * @param <T> generic type T.
     * @param token token.
     */
    public <T extends TokenIdentifier> void setDelegationToken(Token<T> token);
  }

  /**
   * An action that will renew and replace the file system's delegation 
   * tokens automatically.
   */
  public static class RenewAction<T extends FileSystem & Renewable>
      implements Delayed {
    /** when should the renew happen */
    private long renewalTime;
    /** a weak reference to the file system so that it can be garbage collected */
    private final WeakReference<T> weakFs;
    private Token<?> token; 
    boolean isValid = true;

    private RenewAction(final T fs) {
      this.weakFs = new WeakReference<T>(fs);
      this.token = fs.getRenewToken();
      updateRenewalTime(renewCycle);
    }
 
    public boolean isValid() {
      return isValid;
    }
    
    /** Get the delay until this event should happen. */
    @Override
    public long getDelay(final TimeUnit unit) {
      final long millisLeft = renewalTime - Time.now();
      return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(final Delayed delayed) {
      final RenewAction<?> that = (RenewAction<?>)delayed;
      return this.renewalTime < that.renewalTime? -1
          : this.renewalTime == that.renewalTime? 0: 1;
    }

    @Override
    public int hashCode() {
      return token.hashCode();
    }

    @Override
    public boolean equals(final Object that) {
      if (this == that) {
        return true;
      } else if (!(that instanceof RenewAction)) {
        return false;
      }
      return token.equals(((RenewAction<?>)that).token);
    }

    /**
     * Set a new time for the renewal.
     * It can only be called when the action is not in the queue or any
     * collection because the hashCode may change
     * @param delay the renewal time
     */
    private void updateRenewalTime(long delay) {
      renewalTime = Time.now() + delay - delay/10;
    }

    /**
     * Renew or replace the delegation token for this file system.
     * It can only be called when the action is not in the queue.
     * @return
     * @throws IOException
     */
    private boolean renew() throws IOException, InterruptedException {
      final T fs = weakFs.get();
      final boolean b = fs != null;
      if (b) {
        synchronized(fs) {
          try {
            long expires = token.renew(fs.getConf());
            updateRenewalTime(expires - Time.now());
          } catch (IOException ie) {
            try {
              Token<?>[] tokens = fs.addDelegationTokens(null, null);
              if (tokens.length == 0) {
                throw new IOException("addDelegationTokens returned no tokens");
              }
              token = tokens[0];
              updateRenewalTime(renewCycle);
              fs.setDelegationToken(token);
            } catch (IOException ie2) {
              isValid = false;
              throw new IOException("Can't renew or get new delegation token ", ie);
            }
          }
        }
      }
      return b;
    }

    private void cancel() throws IOException, InterruptedException {
      final T fs = weakFs.get();
      if (fs != null) {
        token.cancel(fs.getConf());
      }
    }

    @Override
    public String toString() {
      Renewable fs = weakFs.get();
      return fs == null? "evaporated token renew"
          : "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
            + " secs, renewToken=" + token;
    }
  }

  /** assumes renew cycle for a token is 24 hours... */
  private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; 

  @InterfaceAudience.Private
  @VisibleForTesting
  public static long renewCycle = RENEW_CYCLE;

  /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
  private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
  
  /**
   * For testing purposes.
   *
   * @return renew queue length.
   */
  @VisibleForTesting
  protected int getRenewQueueLength() {
    return queue.size();
  }

  /**
   * Create the singleton instance. However, the thread can be started lazily in
   * {@link #addRenewAction(FileSystem)}
   */
  private static DelegationTokenRenewer INSTANCE = null;

  private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
    super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
    setDaemon(true);
  }

  public static synchronized DelegationTokenRenewer getInstance() {
    if (INSTANCE == null) {
      INSTANCE = new DelegationTokenRenewer(FileSystem.class);
    }
    return INSTANCE;
  }

  @VisibleForTesting
  static synchronized void reset() {
    if (INSTANCE != null) {
      INSTANCE.queue.clear();
      INSTANCE.interrupt();
      try {
        INSTANCE.join();
      } catch (InterruptedException e) {
        LOG.warn("Failed to reset renewer");
      } finally {
        INSTANCE = null;
      }
    }
  }
  
  /**
   * Add a renew action to the queue.
   *
   * @param <T> generic type T.
   * @param fs file system.
   * @return renew action.
   * */
  @SuppressWarnings("static-access")
  public <T extends FileSystem & Renewable> RenewAction<T> addRenewAction(final T fs) {
    synchronized (this) {
      if (!isAlive()) {
        start();
      }
    }
    RenewAction<T> action = new RenewAction<T>(fs);
    if (action.token != null) {
      queue.add(action);
    } else {
      FileSystem.LOG.error("does not have a token for renewal");
    }
    return action;
  }

  /**
   * Remove the associated renew action from the queue
   *
   * @param <T> generic type T.
   * @param fs file system.
   * @throws IOException raised on errors performing I/O.
   */
  public <T extends FileSystem & Renewable> void removeRenewAction(
      final T fs) throws IOException {
    RenewAction<T> action = new RenewAction<T>(fs);
    if (queue.remove(action)) {
      try {
        action.cancel();
      } catch (InterruptedException ie) {
        LOG.error("Interrupted while canceling token for {} filesystem.", fs.getUri());
        LOG.debug("Exception in removeRenewAction.", ie);
      }
    }
  }

  @Override
  public void run() {
    for(;;) {
      RenewAction<?> action = null;
      try {
        action = queue.take();
        if (action.renew()) {
          queue.add(action);
        }
      } catch (InterruptedException ie) {
        return;
      } catch (Exception ie) {
        FileSystem.LOG.warn("Failed to renew token, action=" + action, ie);
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Abortable 源码

hadoop AbstractFileSystem 源码

hadoop AvroFSInput 源码

hadoop BBPartHandle 源码

hadoop BBUploadHandle 源码

hadoop BatchListingOperations 源码

hadoop BatchedRemoteIterator 源码

hadoop BlockLocation 源码

hadoop BlockStoragePolicySpi 源码

hadoop BufferedFSInputStream 源码

0  赞