hadoop ReencryptionHandler 源码

  • 2022-10-20
  • 浏览 (38)

haddop ReencryptionHandler 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY;

/**
 * Class for handling re-encrypt EDEK operations.
 * <p>
 * For each EZ, ReencryptionHandler walks the tree in a depth-first order,
 * and submits batches of (files + existing edeks) as re-encryption tasks
 * to a thread pool. Each thread in the pool then contacts the KMS to
 * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates
 * file xattrs with the new edeks.
 * <p>
 * File renames are disabled in the EZ that's being re-encrypted. Newly created
 * files will have new edeks, because the edek cache is drained upon the
 * submission of a re-encryption command.
 * <p>
 * It is assumed only 1 ReencryptionHandler will be running, because:
 *   1. The bottleneck of the entire re-encryption appears to be on the KMS.
 *   2. Even with multiple handlers, since updater requires writelock and is
 * single-threaded, the performance gain is limited.
 * <p>
 * This class uses the FSDirectory lock for synchronization.
 */
@InterfaceAudience.Private
public class ReencryptionHandler implements Runnable {

  public static final Logger LOG =
      LoggerFactory.getLogger(ReencryptionHandler.class);

  // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is
  // 100 - 200 bytes (depending on the xattr value).
  // The buffer size is hard-coded, see outputBufferCapacity from QJM.
  private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;

  private final EncryptionZoneManager ezManager;
  private final FSDirectory dir;
  private final long interval;
  private final int reencryptBatchSize;
  private double throttleLimitHandlerRatio;
  private final int reencryptThreadPoolSize;
  // stopwatches for throttling
  private final StopWatch throttleTimerAll = new StopWatch();
  private final StopWatch throttleTimerLocked = new StopWatch();

  private ExecutorCompletionService<ReencryptionTask> batchService;
  private BlockingQueue<Runnable> taskQueue;
  // protected by ReencryptionHandler object lock
  private final Map<Long, ZoneSubmissionTracker> submissions = new HashMap<>();

  // The current batch that the handler is working on. Handler is designed to
  // be single-threaded, see class javadoc for more details.
  private ReencryptionBatch currentBatch;

  private final ReencryptionPendingInodeIdCollector traverser;

  private final ReencryptionUpdater reencryptionUpdater;
  private ExecutorService updaterExecutor;

  // Vars for unit tests.
  private volatile boolean shouldPauseForTesting = false;
  private volatile int pauseAfterNthSubmission = 0;

  /**
   * Stop the re-encryption updater thread, as well as all EDEK re-encryption
   * tasks submitted.
   */
  void stopThreads() {
    assert dir.hasWriteLock();
    synchronized (this) {
      for (ZoneSubmissionTracker zst : submissions.values()) {
        zst.cancelAllTasks();
      }
    }
    if (updaterExecutor != null) {
      updaterExecutor.shutdownNow();
    }
  }

  /**
   * Start the re-encryption updater thread.
   */
  void startUpdaterThread() {
    updaterExecutor = Executors.newSingleThreadExecutor(
        new ThreadFactoryBuilder().setDaemon(true)
            .setNameFormat("reencryptionUpdaterThread #%d").build());
    updaterExecutor.execute(reencryptionUpdater);
  }

  @VisibleForTesting
  synchronized void pauseForTesting() {
    shouldPauseForTesting = true;
    LOG.info("Pausing re-encrypt handler for testing.");
    notify();
  }

  @VisibleForTesting
  synchronized void resumeForTesting() {
    shouldPauseForTesting = false;
    LOG.info("Resuming re-encrypt handler for testing.");
    notify();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthSubmission(final int count) {
    assert pauseAfterNthSubmission == 0;
    pauseAfterNthSubmission = count;
  }

  @VisibleForTesting
  void pauseUpdaterForTesting() {
    reencryptionUpdater.pauseForTesting();
  }

  @VisibleForTesting
  void resumeUpdaterForTesting() {
    reencryptionUpdater.resumeForTesting();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
    reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
  }

  ReencryptionHandler(final EncryptionZoneManager ezMgr,
      final Configuration conf) {
    this.ezManager = ezMgr;
    Preconditions.checkNotNull(ezManager.getProvider(),
        "No provider set, cannot re-encrypt");
    this.dir = ezMgr.getFSDirectory();
    this.interval =
        conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY,
            DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT,
            TimeUnit.MILLISECONDS);
    Preconditions.checkArgument(interval > 0,
        DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive.");
    this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY,
        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT);
    Preconditions.checkArgument(reencryptBatchSize > 0,
        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive.");
    if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) {
      LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer "
          + "to be full and trigger a logSync within the writelock, greatly "
          + "impacting namenode throughput.", reencryptBatchSize);
    }
    this.throttleLimitHandlerRatio =
        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT);
    LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption",
        throttleLimitHandlerRatio);
    Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f,
        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY
            + " is not positive.");
    this.reencryptThreadPoolSize =
        conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY,
            DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT);

    taskQueue = new LinkedBlockingQueue<>();
    ThreadPoolExecutor threadPool =
        new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize,
            60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() {
              private final AtomicInteger ind = new AtomicInteger(0);

              @Override
              public Thread newThread(Runnable r) {
                Thread t = super.newThread(r);
                t.setName("reencryption edek Thread-" + ind.getAndIncrement());
                return t;
              }
            }, new ThreadPoolExecutor.CallerRunsPolicy() {

              @Override
              public void rejectedExecution(Runnable runnable,
                  ThreadPoolExecutor e) {
                LOG.info("Execution rejected, executing in current thread");
                super.rejectedExecution(runnable, e);
              }
            });

    threadPool.allowCoreThreadTimeOut(true);
    this.batchService = new ExecutorCompletionService(threadPool);
    reencryptionUpdater =
        new ReencryptionUpdater(dir, batchService, this, conf);
    currentBatch = new ReencryptionBatch(reencryptBatchSize);
    traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf);
  }

  ReencryptionStatus getReencryptionStatus() {
    return ezManager.getReencryptionStatus();
  }

  void cancelZone(final long zoneId, final String zoneName) throws IOException {
    assert dir.hasWriteLock();
    final ZoneReencryptionStatus zs =
        getReencryptionStatus().getZoneStatus(zoneId);
    if (zs == null || zs.getState() == State.Completed) {
      throw new IOException("Zone " + zoneName + " is not under re-encryption");
    }
    zs.cancel();
    removeZoneTrackerStopTasks(zoneId);
  }

  void removeZone(final long zoneId) {
    assert dir.hasWriteLock();
    LOG.info("Removing zone {} from re-encryption.", zoneId);
    removeZoneTrackerStopTasks(zoneId);
    getReencryptionStatus().removeZone(zoneId);
  }

  synchronized private void removeZoneTrackerStopTasks(final long zoneId) {
    final ZoneSubmissionTracker zst = submissions.get(zoneId);
    if (zst != null) {
      zst.cancelAllTasks();
      submissions.remove(zoneId);
    }
  }

  ZoneSubmissionTracker getTracker(final long zoneId) {
    assert dir.hasReadLock();
    return unprotectedGetTracker(zoneId);
  }

  /**
   * Get the tracker without holding the FSDirectory lock.
   * The submissions object is protected by object lock.
   */
  synchronized ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
    return submissions.get(zoneId);
  }

  /**
   * Add a dummy tracker (with 1 task that has 0 files to re-encrypt)
   * for the zone. This is necessary to complete the re-encryption in case
   * no file in the entire zone needs re-encryption at all. We cannot simply
   * update zone status and set zone xattrs, because in the handler we only hold
   * readlock, and setting xattrs requires upgrading to a writelock.
   *
   * @param zoneId
   */
  void addDummyTracker(final long zoneId, ZoneSubmissionTracker zst) {
    assert dir.hasReadLock();
    if (zst == null) {
      zst = new ZoneSubmissionTracker();
    }
    zst.setSubmissionDone();

    final Future future = batchService.submit(
        new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
    zst.addTask(future);
    synchronized (this) {
      submissions.put(zoneId, zst);
    }
  }

  /**
   * Main loop. It takes at most 1 zone per scan, and executes until the zone
   * is completed.
   * {@link #reencryptEncryptionZone(long)}.
   */
  @Override
  public void run() {
    LOG.info("Starting up re-encrypt thread with interval={} millisecond.",
        interval);
    while (true) {
      try {
        synchronized (this) {
          wait(interval);
        }
        traverser.checkPauseForTesting();
      } catch (InterruptedException ie) {
        LOG.info("Re-encrypt handler interrupted. Exiting");
        Thread.currentThread().interrupt();
        return;
      }

      final Long zoneId;
      dir.getFSNamesystem().readLock();
      try {
        zoneId = getReencryptionStatus().getNextUnprocessedZone();
        if (zoneId == null) {
          // empty queue.
          continue;
        }
        LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
            zoneId, getReencryptionStatus());
        getReencryptionStatus().markZoneStarted(zoneId);
        resetSubmissionTracker(zoneId);
      } finally {
        dir.getFSNamesystem().readUnlock("reEncryptThread");
      }

      try {
        reencryptEncryptionZone(zoneId);
      } catch (RetriableException | SafeModeException re) {
        LOG.info("Re-encryption caught exception, will retry", re);
        getReencryptionStatus().markZoneForRetry(zoneId);
      } catch (IOException ioe) {
        LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe);
      } catch (InterruptedException ie) {
        LOG.info("Re-encrypt handler interrupted. Exiting.");
        Thread.currentThread().interrupt();
        return;
      } catch (Throwable t) {
        LOG.error("Re-encrypt handler thread exiting. Exception caught when"
            + " re-encrypting zone {}.", zoneId, t);
        return;
      }
    }
  }

  /**
   * Re-encrypts a zone by recursively iterating all paths inside the zone,
   * in lexicographic order.
   * Files are re-encrypted, and subdirs are processed during iteration.
   *
   * @param zoneId the Zone's id.
   * @throws IOException
   * @throws InterruptedException
   */
  void reencryptEncryptionZone(final long zoneId)
      throws IOException, InterruptedException {
    throttleTimerAll.reset().start();
    throttleTimerLocked.reset();
    final INode zoneNode;
    final ZoneReencryptionStatus zs;

    traverser.readLock();
    try {
      zoneNode = dir.getInode(zoneId);
      // start re-encrypting the zone from the beginning
      if (zoneNode == null) {
        LOG.info("Directory with id {} removed during re-encrypt, skipping",
            zoneId);
        return;
      }
      if (!zoneNode.isDirectory()) {
        LOG.info("Cannot re-encrypt directory with id {} because it's not a"
            + " directory.", zoneId);
        return;
      }

      zs = getReencryptionStatus().getZoneStatus(zoneId);
      assert zs != null;
      // Only costly log FullPathName here once, and use id elsewhere.
      LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(),
          zoneId);
      if (zs.getLastCheckpointFile() == null) {
        // new re-encryption
        traverser.traverseDir(zoneNode.asDirectory(), zoneId,
            HdfsFileStatus.EMPTY_NAME,
            new ZoneTraverseInfo(zs.getEzKeyVersionName()));
      } else {
        // resuming from a past re-encryption
        restoreFromLastProcessedFile(zoneId, zs);
      }
      // save the last batch and mark complete
      traverser.submitCurrentBatch(zoneId);
      LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
      reencryptionUpdater.markZoneSubmissionDone(zoneId);
    } finally {
      traverser.readUnlock();
    }
  }

  /**
   * Reset the zone submission tracker for re-encryption.
   * @param zoneId
   */
  synchronized private void resetSubmissionTracker(final long zoneId) {
    ZoneSubmissionTracker zst = submissions.get(zoneId);
    if (zst == null) {
      zst = new ZoneSubmissionTracker();
      submissions.put(zoneId, zst);
    } else {
      zst.reset();
    }
  }

  List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
    assert dir.hasWriteLock();
    assert dir.getFSNamesystem().hasWriteLock();
    final Long zoneId = zoneNode.getId();
    ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId);
    assert zs != null;
    LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
            + " failures encountered: {}.", zoneNode.getFullPathName(),
        zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
    synchronized (this) {
      submissions.remove(zoneId);
    }
    return FSDirEncryptionZoneOp
        .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
  }

  /**
   * Restore the re-encryption from the progress inside ReencryptionStatus.
   * This means start from exactly the lastProcessedFile (LPF), skipping all
   * earlier paths in lexicographic order. Lexicographically-later directories
   * on the LPF parent paths are added to subdirs.
   */
  private void restoreFromLastProcessedFile(final long zoneId,
      final ZoneReencryptionStatus zs)
      throws IOException, InterruptedException {
    final INodeDirectory parent;
    final byte[] startAfter;
    final INodesInPath lpfIIP =
        dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
    parent = lpfIIP.getLastINode().getParent();
    startAfter = lpfIIP.getLastINode().getLocalNameBytes();
    traverser.traverseDir(parent, zoneId, startAfter,
        new ZoneTraverseInfo(zs.getEzKeyVersionName()));
  }

  final class ReencryptionBatch {
    // First file's path, for logging purpose.
    private String firstFilePath;
    private final List<FileEdekInfo> batch;

    ReencryptionBatch() {
      this(reencryptBatchSize);
    }

    ReencryptionBatch(int initialCapacity) {
      batch = new ArrayList<>(initialCapacity);
    }

    void add(final INodeFile inode) throws IOException {
      assert dir.hasReadLock();
      Preconditions.checkNotNull(inode, "INodeFile is null");
      if (batch.isEmpty()) {
        firstFilePath = inode.getFullPathName();
      }
      batch.add(new FileEdekInfo(dir, inode));
    }

    String getFirstFilePath() {
      return firstFilePath;
    }

    boolean isEmpty() {
      return batch.isEmpty();
    }

    int size() {
      return batch.size();
    }

    void clear() {
      batch.clear();
    }

    List<FileEdekInfo> getBatch() {
      return batch;
    }
  }

  /**
   * Simply contacts the KMS for re-encryption. No NN locks held.
   */
  private static class EDEKReencryptCallable
      implements Callable<ReencryptionTask> {
    private final long zoneNodeId;
    private final ReencryptionBatch batch;
    private final ReencryptionHandler handler;

    EDEKReencryptCallable(final long zoneId,
        final ReencryptionBatch currentBatch, final ReencryptionHandler rh) {
      zoneNodeId = zoneId;
      batch = currentBatch;
      handler = rh;
    }

    @Override
    public ReencryptionTask call() {
      LOG.info("Processing batched re-encryption for zone {}, batch size {},"
          + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath());
      if (batch.isEmpty()) {
        return new ReencryptionTask(zoneNodeId, 0, batch);
      }
      final StopWatch kmsSW = new StopWatch().start();

      int numFailures = 0;
      String result = "Completed";
      if (!reencryptEdeks()) {
        numFailures += batch.size();
        result = "Failed to";
      }
      LOG.info("{} re-encrypting one batch of {} edeks from KMS,"
              + " time consumed: {}, start: {}.", result,
          batch.size(), kmsSW.stop(), batch.getFirstFilePath());
      return new ReencryptionTask(zoneNodeId, numFailures, batch);
    }

    private boolean reencryptEdeks() {
      // communicate with the kms out of lock
      final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size());
      for (FileEdekInfo entry : batch.getBatch()) {
        edeks.add(entry.getExistingEdek());
      }
      // provider already has LoadBalancingKMSClientProvider's reties. It that
      // fails, just fail this callable.
      try {
        handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
        EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
      } catch (GeneralSecurityException | IOException ex) {
        LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}",
            batch.size(), batch.getFirstFilePath(), ex);
        return false;
      }
      int i = 0;
      for (FileEdekInfo entry : batch.getBatch()) {
        assert i < edeks.size();
        entry.setEdek(edeks.get(i++));
      }
      return true;
    }
  }


  /**
   * Called when a new zone is submitted for re-encryption. This will interrupt
   * the background thread if it's waiting for the next
   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
   */
  synchronized void notifyNewSubmission() {
    LOG.debug("Notifying handler for new re-encryption command.");
    this.notify();
  }

  public ReencryptionPendingInodeIdCollector getTraverser() {
    return traverser;
  }

  /**
   * ReencryptionPendingInodeIdCollector which throttle based on configured
   * throttle ratio.
   */
  class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {

    private final ReencryptionHandler reencryptionHandler;

    ReencryptionPendingInodeIdCollector(FSDirectory dir,
        ReencryptionHandler rHandler, Configuration conf) {
      super(dir, conf);
      this.reencryptionHandler = rHandler;
    }

    @Override
    protected void checkPauseForTesting()
        throws InterruptedException {
      assert !dir.hasReadLock();
      assert !dir.getFSNamesystem().hasReadLock();
      while (shouldPauseForTesting) {
        LOG.info("Sleeping in the re-encrypt handler for unit test.");
        synchronized (reencryptionHandler) {
          if (shouldPauseForTesting) {
            reencryptionHandler.wait(30000);
          }
        }
        LOG.info("Continuing re-encrypt handler after pausing.");
      }
    }

    /**
     * Process an Inode for re-encryption. Add to current batch if it's a file,
     * no-op otherwise.
     *
     * @param inode
     *          the inode
     * @return true if inode is added to currentBatch and should be
     *         re-encrypted. false otherwise: could be inode is not a file, or
     *         inode's edek's key version is not changed.
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
        throws IOException, InterruptedException {
      assert dir.hasReadLock();
      if (LOG.isTraceEnabled()) {
        LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
      }
      if (!inode.isFile()) {
        return false;
      }
      FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
          dir, INodesInPath.fromINode(inode));
      if (feInfo == null) {
        LOG.warn("File {} skipped re-encryption because it is not encrypted! "
            + "This is very likely a bug.", inode.getId());
        return false;
      }
      if (traverseInfo instanceof ZoneTraverseInfo
          && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
              feInfo.getEzKeyVersionName())) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("File {} skipped re-encryption because edek's key version"
              + " name is not changed.", inode.getFullPathName());
        }
        return false;
      }
      currentBatch.add(inode.asFile());
      return true;
    }

    /**
     * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
     * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
     * active or is in safe mode.
     *
     * @throws IOException
     *           if zone does not exist / is cancelled, or if NN is not ready
     *           for write.
     */
    @Override
    protected void checkINodeReady(long zoneId) throws IOException {
      final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
          zoneId);
      if (zs == null) {
        throw new IOException("Zone " + zoneId + " status cannot be found.");
      }
      if (zs.isCanceled()) {
        throw new IOException("Re-encryption is canceled for zone " + zoneId);
      }
      dir.getFSNamesystem().checkNameNodeSafeMode(
          "NN is in safe mode, cannot re-encrypt.");
      // re-encryption should be cancelled when NN goes to standby. Just
      // double checking for sanity.
      dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
    }

    /**
     * Submit the current batch to the thread pool.
     *
     * @param zoneId
     *          Id of the EZ INode
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void submitCurrentBatch(final Long zoneId) throws IOException,
        InterruptedException {
      if (currentBatch.isEmpty()) {
        return;
      }
      ZoneSubmissionTracker zst;
      synchronized (ReencryptionHandler.this) {
        zst = submissions.get(zoneId);
        if (zst == null) {
          zst = new ZoneSubmissionTracker();
          submissions.put(zoneId, zst);
        }
        Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
            currentBatch, reencryptionHandler));
        zst.addTask(future);
      }
      LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
          currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
      currentBatch = new ReencryptionBatch(reencryptBatchSize);
      // flip the pause flag if this is nth submission.
      // The actual pause need to happen outside of the lock.
      if (pauseAfterNthSubmission > 0) {
        if (--pauseAfterNthSubmission == 0) {
          shouldPauseForTesting = true;
        }
      }
    }

    /**
     * Throttles the ReencryptionHandler in 3 aspects:
     * 1. Prevents generating more Callables than the CPU could possibly
     * handle.
     * 2. Prevents generating more Callables than the ReencryptionUpdater
     * can handle, under its own throttling.
     * 3. Prevents contending FSN/FSD read locks. This is done based
     * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
     * <p>
     * Item 1 and 2 are to control NN heap usage.
     *
     * @throws InterruptedException
     */
    @VisibleForTesting
    @Override
    protected void throttle() throws InterruptedException {
      assert !dir.hasReadLock();
      assert !dir.getFSNamesystem().hasReadLock();
      final int numCores = Runtime.getRuntime().availableProcessors();
      if (taskQueue.size() >= numCores) {
        LOG.debug("Re-encryption handler throttling because queue size {} is"
            + "larger than number of cores {}", taskQueue.size(), numCores);
        while (taskQueue.size() >= numCores) {
          Thread.sleep(100);
        }
      }

      // 2. if tasks are piling up on the updater, don't create new callables
      // until the queue size goes down.
      final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
      int numTasks = numTasksSubmitted();
      if (numTasks >= maxTasksPiled) {
        LOG.debug("Re-encryption handler throttling because total tasks pending"
            + " re-encryption updater is {}", numTasks);
        while (numTasks >= maxTasksPiled) {
          Thread.sleep(500);
          numTasks = numTasksSubmitted();
        }
      }

      // 3.
      if (throttleLimitHandlerRatio >= 1.0) {
        return;
      }
      final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
          * throttleLimitHandlerRatio);
      final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
            + " throttleTimerAll:{}", expect, actual,
            throttleTimerAll.now(TimeUnit.MILLISECONDS));
      }
      if (expect - actual < 0) {
        // in case throttleLimitHandlerRatio is very small, expect will be 0.
        // so sleepMs should not be calculated from expect, to really meet the
        // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
        // should be 1000 - throttleTimerAll.now()
        final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
            - throttleTimerAll.now(TimeUnit.MILLISECONDS);
        LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
        Thread.sleep(sleepMs);
      }
      throttleTimerAll.reset().start();
      throttleTimerLocked.reset();
    }

    private int numTasksSubmitted() {
      int ret = 0;
      synchronized (ReencryptionHandler.this) {
        for (ZoneSubmissionTracker zst : submissions.values()) {
          ret += zst.getTasks().size();
        }
      }
      return ret;
    }

    @Override
    public boolean shouldSubmitCurrentBatch() {
      return currentBatch.size() >= reencryptBatchSize;
    }

    @Override
    public boolean canTraverseDir(INode inode) throws IOException {
      if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
        // nested EZ, ignore.
        LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
            inode.getFullPathName(), inode.getId());
        return false;
      }
      return true;
    }

    @Override
    protected void readLock() {
      super.readLock();
      throttleTimerLocked.start();
    }

    @Override
    protected void readUnlock() {
      super.readUnlock();
      throttleTimerLocked.stop();
    }
  }

  private static class ZoneTraverseInfo extends TraverseInfo {
    private String ezKeyVerName;

    ZoneTraverseInfo(String ezKeyVerName) {
      this.ezKeyVerName = ezKeyVerName;
    }

    public String getEzKeyVerName() {
      return ezKeyVerName;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞