hadoop ReencryptionUpdater 源码

  • 2022-10-20
  • 浏览 (84)

haddop ReencryptionUpdater 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;

/**
 * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
 * edeks returned from reencryption.
 * <p>
 * The tasks are submitted by ReencryptionHandler.
 * <p>
 * It is assumed only 1 Updater will be running, since updating file xattrs
 * requires namespace write lock, and performance gain from multi-threading
 * is limited.
 */
@InterfaceAudience.Private
public final class ReencryptionUpdater implements Runnable {

  public static final Logger LOG =
      LoggerFactory.getLogger(ReencryptionUpdater.class);

  private volatile boolean shouldPauseForTesting = false;
  private volatile int pauseAfterNthCheckpoint = 0;
  private volatile long pauseZoneId = 0;

  private double throttleLimitRatio;
  private final StopWatch throttleTimerAll = new StopWatch();
  private final StopWatch throttleTimerLocked = new StopWatch();

  private volatile long faultRetryInterval = 60000;
  private volatile boolean isRunning = false;

  /**
   * Class to track re-encryption submissions of a single zone. It contains
   * all the submitted futures, and statistics about how far the futures are
   * processed.
   */
  static final class ZoneSubmissionTracker {
    private boolean submissionDone;
    private LinkedList<Future> tasks;
    private int numCheckpointed;
    private int numFutureDone;

    ZoneSubmissionTracker() {
      submissionDone = false;
      tasks = new LinkedList<>();
      numCheckpointed = 0;
      numFutureDone = 0;
    }

    void reset() {
      submissionDone = false;
      tasks.clear();
      numCheckpointed = 0;
      numFutureDone = 0;
    }

    LinkedList<Future> getTasks() {
      return tasks;
    }

    void cancelAllTasks() {
      if (!tasks.isEmpty()) {
        LOG.info("Cancelling {} re-encryption tasks", tasks.size());
        for (Future f : tasks) {
          f.cancel(true);
        }
      }
    }

    void addTask(final Future task) {
      tasks.add(task);
    }

    private boolean isCompleted() {
      return submissionDone && tasks.isEmpty();
    }

    void setSubmissionDone() {
      submissionDone = true;
    }
  }

  /**
   * Class representing the task for one batch of a re-encryption command. It
   * also contains statistics about how far this single batch has been executed.
   */
  static final class ReencryptionTask {
    private final long zoneId;
    private boolean processed = false;
    private int numFilesUpdated = 0;
    private int numFailures = 0;
    private String lastFile = null;
    private final ReencryptionBatch batch;

    ReencryptionTask(final long id, final int failures,
        final ReencryptionBatch theBatch) {
      zoneId = id;
      numFailures = failures;
      batch = theBatch;
    }
  }

  /**
   * Class that encapsulates re-encryption details of a file. It contains the
   * file inode, stores the initial edek of the file, and the new edek
   * after re-encryption.
   * <p>
   * Assumptions are the object initialization happens when dir lock is held,
   * and inode is valid and is encrypted during initialization.
   * <p>
   * Namespace changes may happen during re-encryption, and if inode is changed
   * the re-encryption is skipped.
   */
  static final class FileEdekInfo {
    private final long inodeId;
    private final EncryptedKeyVersion existingEdek;
    private EncryptedKeyVersion edek = null;

    FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
      assert dir.hasReadLock();
      Preconditions.checkNotNull(inode, "INodeFile is null");
      inodeId = inode.getId();
      final FileEncryptionInfo fei = FSDirEncryptionZoneOp
          .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
      Preconditions.checkNotNull(fei,
          "FileEncryptionInfo is null for " + inodeId);
      existingEdek = EncryptedKeyVersion
          .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
              fei.getIV(), fei.getEncryptedDataEncryptionKey());
    }

    long getInodeId() {
      return inodeId;
    }

    EncryptedKeyVersion getExistingEdek() {
      return existingEdek;
    }

    void setEdek(final EncryptedKeyVersion ekv) {
      assert ekv != null;
      edek = ekv;
    }
  }

  @VisibleForTesting
  synchronized void pauseForTesting() {
    shouldPauseForTesting = true;
    LOG.info("Pausing re-encrypt updater for testing.");
    notify();
  }

  @VisibleForTesting
  synchronized void resumeForTesting() {
    shouldPauseForTesting = false;
    LOG.info("Resuming re-encrypt updater for testing.");
    notify();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
    assert pauseAfterNthCheckpoint == 0;
    pauseAfterNthCheckpoint = count;
    pauseZoneId = zoneId;
  }

  @VisibleForTesting
  boolean isRunning() {
    return isRunning;
  }

  private final FSDirectory dir;
  private final CompletionService<ReencryptionTask> batchService;
  private final ReencryptionHandler handler;

  ReencryptionUpdater(final FSDirectory fsd,
      final CompletionService<ReencryptionTask> service,
      final ReencryptionHandler rh, final Configuration conf) {
    dir = fsd;
    batchService = service;
    handler = rh;
    this.throttleLimitRatio =
        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
    Preconditions.checkArgument(throttleLimitRatio > 0.0f,
        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
            + " is not positive.");
  }

  /**
   * Called by the submission thread to indicate all tasks have been submitted.
   * If this is called but no tasks has been submitted, the re-encryption is
   * considered complete.
   *
   * @param zoneId Id of the zone inode.
   * @throws IOException
   * @throws InterruptedException
   */
  void markZoneSubmissionDone(final long zoneId)
      throws IOException, InterruptedException {
    final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
    if (tracker != null && !tracker.getTasks().isEmpty()) {
      tracker.submissionDone = true;
    } else {
      // Caller thinks submission is done, but no tasks submitted - meaning
      // no files in the EZ need to be re-encrypted. Complete directly.
      handler.addDummyTracker(zoneId, tracker);
    }
  }

  @Override
  public void run() {
    isRunning = true;
    throttleTimerAll.start();
    while (true) {
      try {
        // Assuming single-threaded updater.
        takeAndProcessTasks();
      } catch (InterruptedException ie) {
        LOG.warn("Re-encryption updater thread interrupted. Exiting.");
        Thread.currentThread().interrupt();
        isRunning = false;
        return;
      } catch (IOException | CancellationException e) {
        LOG.warn("Re-encryption updater thread exception.", e);
      } catch (Throwable t) {
        LOG.error("Re-encryption updater thread exiting.", t);
        isRunning = false;
        return;
      }
    }
  }

  /**
   * Process a completed ReencryptionTask. Each inode id is resolved to an INode
   * object, skip if the inode is deleted.
   * <p>
   * Only file xattr is updated by this method. Re-encryption progress is not
   * updated.
   *
   * @param zoneNodePath full path of the EZ inode.
   * @param task     the completed task.
   * @throws IOException
   * @throws InterruptedException
   */
  private void processTaskEntries(final String zoneNodePath,
      final ReencryptionTask task) throws IOException, InterruptedException {
    assert dir.hasWriteLock();
    if (!task.batch.isEmpty() && task.numFailures == 0) {
      LOG.debug(
          "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
          zoneNodePath, task.batch.getFirstFilePath());
      final int batchSize = task.batch.size();
      for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
           it.hasNext();) {
        FileEdekInfo entry = it.next();
        // resolve the inode again, and skip if it's doesn't exist
        LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
        final INode inode = dir.getInode(entry.getInodeId());
        if (inode == null) {
          LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
              entry.getInodeId());
          // also remove from batch so later it's not saved.
          it.remove();
          continue;
        }

        // Cautiously check file encryption info, and only update if we're sure
        // it's still using the same edek.
        Preconditions.checkNotNull(entry.edek);
        final FileEncryptionInfo fei = FSDirEncryptionZoneOp
            .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
        if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
          LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
              entry.getInodeId());
          it.remove();
          continue;
        }
        if (fei.getEzKeyVersionName()
            .equals(entry.edek.getEncryptionKeyVersionName())) {
          LOG.debug(
              "Inode {} EZ key version unchanged, skipping re-encryption.",
              entry.getInodeId());
          it.remove();
          continue;
        }
        if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
            entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
          LOG.debug("Inode {} existing edek changed, skipping re-encryption",
              entry.getInodeId());
          it.remove();
          continue;
        }
        FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
            fei.getCryptoProtocolVersion(),
            entry.edek.getEncryptedKeyVersion().getMaterial(),
            entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
            entry.edek.getEncryptionKeyVersionName());
        final INodesInPath iip = INodesInPath.fromINode(inode);
        FSDirEncryptionZoneOp
            .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
        task.lastFile = iip.getPath();
        ++task.numFilesUpdated;
      }

      LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
              + " starting:{}.", task.numFilesUpdated, batchSize,
          zoneNodePath, task.batch.getFirstFilePath());
    }
    task.processed = true;
  }

  /**
   * Iterate tasks for the given zone, and update progress accordingly. The
   * checkpoint indicates all files before it are done re-encryption, so it will
   * be updated to the position where all tasks before are completed.
   *
   * @param zoneNode the EZ inode.
   * @param tracker  the zone submission tracker.
   * @return the list containing the last checkpointed xattr. Empty if
   *   no checkpoint happened.
   * @throws ExecutionException
   * @throws IOException
   * @throws InterruptedException
   */
  private List<XAttr> processCheckpoints(final INode zoneNode,
      final ZoneSubmissionTracker tracker)
      throws ExecutionException, IOException, InterruptedException {
    assert dir.hasWriteLock();
    final long zoneId = zoneNode.getId();
    final String zonePath = zoneNode.getFullPathName();
    final ZoneReencryptionStatus status =
        handler.getReencryptionStatus().getZoneStatus(zoneId);
    assert status != null;
    // always start from the beginning, because the checkpoint means all files
    // before it are re-encrypted.
    final LinkedList<Future> tasks = tracker.getTasks();
    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
    ListIterator<Future> iter = tasks.listIterator();
    synchronized (handler) {
      while (iter.hasNext()) {
        Future<ReencryptionTask> curr = iter.next();
        if (curr.isCancelled()) {
          break;
        }
        if (!curr.isDone() || !curr.get().processed) {
          // still has earlier tasks not completed, skip here.
          break;
        }
        ReencryptionTask task = curr.get();
        LOG.debug("Updating re-encryption checkpoint with completed task."
            + " last: {} size:{}.", task.lastFile, task.batch.size());
        assert zoneId == task.zoneId;
        try {
          final XAttr xattr = FSDirEncryptionZoneOp
              .updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
                  task.numFilesUpdated, task.numFailures);
          xAttrs.clear();
          xAttrs.add(xattr);
        } catch (IOException ie) {
          LOG.warn("Failed to update re-encrypted progress to xattr" +
                  " for zone {}", zonePath, ie);
          ++task.numFailures;
        }
        ++tracker.numCheckpointed;
        iter.remove();
      }
    }
    if (tracker.isCompleted()) {
      LOG.debug("Removed re-encryption tracker for zone {} because it completed"
              + " with {} tasks.", zonePath, tracker.numCheckpointed);
      return handler.completeReencryption(zoneNode);
    }
    return xAttrs;
  }

  private void takeAndProcessTasks() throws Exception {
    final Future<ReencryptionTask> completed = batchService.take();
    throttle();
    checkPauseForTesting();
    if (completed.isCancelled()) {
      // Ignore canceled zones. The cancellation is edit-logged by the handler.
      LOG.debug("Skipped a canceled re-encryption task");
      return;
    }
    final ReencryptionTask task = completed.get();

    boolean shouldRetry;
    do {
      dir.getFSNamesystem().writeLock();
      try {
        throttleTimerLocked.start();
        processTask(task);
        shouldRetry = false;
      } catch (RetriableException | SafeModeException re) {
        // Keep retrying until succeed.
        LOG.info("Exception when processing re-encryption task for zone {}, "
                + "retrying...", task.zoneId, re);
        shouldRetry = true;
        Thread.sleep(faultRetryInterval);
      } catch (IOException ioe) {
        LOG.warn("Failure processing re-encryption task for zone {}",
            task.zoneId, ioe);
        ++task.numFailures;
        task.processed = true;
        shouldRetry = false;
      } finally {
        dir.getFSNamesystem().writeUnlock("reencryptUpdater");
        throttleTimerLocked.stop();
      }
      // logSync regardless, to prevent edit log buffer overflow triggering
      // logSync inside FSN writelock.
      dir.getEditLog().logSync();
    } while (shouldRetry);
  }

  private void processTask(ReencryptionTask task)
      throws InterruptedException, ExecutionException, IOException {
    final List<XAttr> xAttrs;
    final String zonePath;
    dir.writeLock();
    try {
      handler.getTraverser().checkINodeReady(task.zoneId);
      final INode zoneNode = dir.getInode(task.zoneId);
      if (zoneNode == null) {
        // ez removed.
        return;
      }
      zonePath = zoneNode.getFullPathName();
      LOG.info("Processing returned re-encryption task for zone {}({}), "
              + "batch size {}, start:{}", zonePath, task.zoneId,
          task.batch.size(), task.batch.getFirstFilePath());
      final ZoneSubmissionTracker tracker =
          handler.getTracker(zoneNode.getId());
      if (tracker == null) {
        // re-encryption canceled.
        LOG.info("Re-encryption was canceled.");
        return;
      }
      tracker.numFutureDone++;
      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
      processTaskEntries(zonePath, task);
      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
      xAttrs = processCheckpoints(zoneNode, tracker);
    } finally {
      dir.writeUnlock();
    }
    FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
    if (!xAttrs.isEmpty()) {
      dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
    }
  }

  private synchronized void checkPauseForTesting() throws InterruptedException {
    assert !dir.hasWriteLock();
    assert !dir.getFSNamesystem().hasWriteLock();
    if (pauseAfterNthCheckpoint != 0) {
      ZoneSubmissionTracker tracker =
          handler.unprotectedGetTracker(pauseZoneId);
      if (tracker != null) {
        if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
          shouldPauseForTesting = true;
          pauseAfterNthCheckpoint = 0;
        }
      }
    }
    while (shouldPauseForTesting) {
      LOG.info("Sleeping in the re-encryption updater for unit test.");
      wait();
      LOG.info("Continuing re-encryption updater after pausing.");
    }
  }

  /**
   * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
   * locks. This is done by the configuration.
   */
  private void throttle() throws InterruptedException {
    if (throttleLimitRatio >= 1.0) {
      return;
    }

    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
        * throttleLimitRatio);
    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
              + " throttleTimerAll:{}", expect, actual,
          throttleTimerAll.now(TimeUnit.MILLISECONDS));
    }
    if (expect - actual < 0) {
      // in case throttleLimitHandlerRatio is very small, expect will be 0.
      // so sleepMs should not be calculated from expect, to really meet the
      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
      // should be 1000 - throttleTimerAll.now()
      final long sleepMs =
          (long) (actual / throttleLimitRatio) - throttleTimerAll
              .now(TimeUnit.MILLISECONDS);
      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
      Thread.sleep(sleepMs);
    }
    throttleTimerAll.reset().start();
    throttleTimerLocked.reset();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞