hadoop FSTreeTraverser 源码

  • 2022-10-20
  • 浏览 (198)

haddop FSTreeTraverser 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.util.Preconditions;

/**
 * FSTreeTraverser traverse directory recursively and process files
 * in batches.
 */
@InterfaceAudience.Private
public abstract class FSTreeTraverser {


  public static final Logger LOG = LoggerFactory
      .getLogger(FSTreeTraverser.class);

  private final FSDirectory dir;

  private long readLockReportingThresholdMs;

  private Timer timer;

  public FSTreeTraverser(FSDirectory dir, Configuration conf) {
    this.dir = dir;
    this.readLockReportingThresholdMs = conf.getLong(
        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
    timer = new Timer();
  }

  public FSDirectory getFSDirectory() {
    return dir;
  }

  /**
   * Iterate through all files directly inside parent, and recurse down
   * directories. The listing is done in batch, and can optionally start after
   * a position. The iteration of the inode tree is done in a depth-first
   * fashion. But instead of holding all {@link INodeDirectory}'s in memory
   * on the fly, only the path components to the current inode is held. This
   * is to reduce memory consumption.
   *
   * @param parent
   *          The inode id of parent directory
   * @param startId
   *          Id of the start inode.
   * @param startAfter
   *          Full path of a file the traverse should start after.
   * @param traverseInfo
   *          info which may required for processing the child's.
   * @throws IOException
   * @throws InterruptedException
   */
  protected void traverseDir(final INodeDirectory parent, final long startId,
      byte[] startAfter, final TraverseInfo traverseInfo)
      throws IOException, InterruptedException {
    List<byte[]> startAfters = new ArrayList<>();
    if (parent == null) {
      return;
    }
    INode curr = parent;
    // construct startAfters all the way up to the zone inode.
    startAfters.add(startAfter);
    while (curr.getId() != startId) {
      startAfters.add(0, curr.getLocalNameBytes());
      curr = curr.getParent();
    }
    curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
    while (!startAfters.isEmpty()) {
      if (curr == null) {
        // lock was reacquired, re-resolve path.
        curr = resolvePaths(startId, startAfters);
      }
      curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
    }
  }

  /**
   * Iterates the parent directory, and add direct children files to current
   * batch. If batch size meets configured threshold, current batch will be
   * submitted for the processing.
   * <p>
   * Locks could be released and reacquired when a batch submission is
   * finished.
   *
   * @param startId
   *          Id of the start inode.
   * @return The inode which was just processed, if lock is held in the entire
   *         process. Null if lock is released.
   * @throws IOException
   * @throws InterruptedException
   */
  protected INode traverseDirInt(final long startId, INode curr,
      List<byte[]> startAfters, final TraverseInfo traverseInfo)
      throws IOException, InterruptedException {
    assert dir.hasReadLock();
    assert dir.getFSNamesystem().hasReadLock();
    long lockStartTime = timer.monotonicNow();
    Preconditions.checkNotNull(curr, "Current inode can't be null");
    checkINodeReady(startId);
    final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
        : curr.getParent();
    ReadOnlyList<INode> children = parent
        .getChildrenList(Snapshot.CURRENT_STATE_ID);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Traversing directory {}", parent.getFullPathName());
    }

    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
    boolean lockReleased = false;
    for (int i = INodeDirectory.nextChild(children, startAfter); i < children
        .size(); ++i) {
      final INode inode = children.get(i);
      if (!processFileInode(inode, traverseInfo)) {
        // inode wasn't processes. Recurse down if it's a dir,
        // skip otherwise.
        if (!inode.isDirectory()) {
          continue;
        }

        if (!canTraverseDir(inode)) {
          continue;
        }
        // add 1 level to the depth-first search.
        curr = inode;
        if (!startAfters.isEmpty()) {
          startAfters.remove(startAfters.size() - 1);
          startAfters.add(curr.getLocalNameBytes());
        }
        startAfters.add(HdfsFileStatus.EMPTY_NAME);
        return lockReleased ? null : curr;
      }
      if (shouldSubmitCurrentBatch()) {
        final byte[] currentStartAfter = inode.getLocalNameBytes();
        final String parentPath = parent.getFullPathName();
        lockReleased = true;
        readUnlock();
        submitCurrentBatch(startId);
        try {
          throttle();
          checkPauseForTesting();
        } finally {
          readLock();
          lockStartTime = timer.monotonicNow();
        }
        checkINodeReady(startId);

        // Things could have changed when the lock was released.
        // Re-resolve the parent inode.
        FSPermissionChecker pc = dir.getPermissionChecker();
        INode newParent = dir
            .resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
            .getLastINode();
        if (newParent == null || !newParent.equals(parent)) {
          // parent dir is deleted or recreated. We're done.
          return null;
        }
        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
        // -1 to counter the ++ on the for loop
        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
      }
      if ((timer.monotonicNow()
          - lockStartTime) > readLockReportingThresholdMs) {
        readUnlock();
        try {
          throttle();
        } finally {
          readLock();
          lockStartTime = timer.monotonicNow();
        }
      }
    }
    // Successfully finished this dir, adjust pointers to 1 level up, and
    // startAfter this dir.
    startAfters.remove(startAfters.size() - 1);
    if (!startAfters.isEmpty()) {
      startAfters.remove(startAfters.size() - 1);
      startAfters.add(curr.getLocalNameBytes());
    }
    curr = curr.getParent();
    return lockReleased ? null : curr;
  }

  /**
   * Resolve the cursor of traverse to an inode.
   * <p>
   * The parent of the lowest level startAfter is returned. If somewhere in the
   * middle of startAfters changed, the parent of the lowest unchanged level is
   * returned.
   *
   * @param startId
   *          Id of the start inode.
   * @param startAfters
   *          the cursor, represented by a list of path bytes.
   * @return the parent inode corresponding to the startAfters, or null if the
   *         furthest parent is deleted.
   */
  private INode resolvePaths(final long startId, List<byte[]> startAfters)
      throws IOException {
    // If the readlock was reacquired, we need to resolve the paths again
    // in case things have changed. If our cursor file/dir is changed,
    // continue from the next one.
    INode zoneNode = dir.getInode(startId);
    if (zoneNode == null) {
      throw new FileNotFoundException("Zone " + startId + " is deleted.");
    }
    INodeDirectory parent = zoneNode.asDirectory();
    for (int i = 0; i < startAfters.size(); ++i) {
      if (i == startAfters.size() - 1) {
        // last startAfter does not need to be resolved, since search for
        // nextChild will cover that automatically.
        break;
      }
      INode curr = parent.getChild(startAfters.get(i),
          Snapshot.CURRENT_STATE_ID);
      if (curr == null) {
        // inode at this level has changed. Update startAfters to point to
        // the next dir at the parent level (and dropping any startAfters
        // at lower levels).
        for (; i < startAfters.size(); ++i) {
          startAfters.remove(startAfters.size() - 1);
        }
        break;
      }
      parent = curr.asDirectory();
    }
    return parent;
  }

  protected void readLock() {
    dir.getFSNamesystem().readLock();
    dir.readLock();
  }

  protected void readUnlock() {
    dir.readUnlock();
    dir.getFSNamesystem().readUnlock("FSTreeTraverser");
  }


  protected abstract void checkPauseForTesting() throws InterruptedException;

  /**
   * Process an Inode. Add to current batch if it's a file, no-op otherwise.
   *
   * @param inode
   *          the inode
   * @return true if inode is added to currentBatch and should be process for
   *         next operation. false otherwise: could be inode is not a file.
   * @throws IOException
   * @throws InterruptedException
   */
  protected abstract boolean processFileInode(INode inode,
      TraverseInfo traverseInfo) throws IOException, InterruptedException;

  /**
   * Check whether current batch can be submitted for the processing.
   *
   * @return true if batch size meets the condition, otherwise false.
   */
  protected abstract boolean shouldSubmitCurrentBatch();

  /**
   * Check whether inode is ready for traverse. Throws IOE if it's not.
   *
   * @param startId
   *          Id of the start inode.
   * @throws IOException
   */
  protected abstract void checkINodeReady(long startId) throws IOException;

  /**
   * Submit the current batch for processing.
   *
   * @param startId
   *          Id of the start inode.
   * @throws IOException
   * @throws InterruptedException
   */
  protected abstract void submitCurrentBatch(Long startId)
      throws IOException, InterruptedException;

  /**
   * Throttles the FSTreeTraverser.
   *
   * @throws InterruptedException
   */
  protected abstract void throttle() throws InterruptedException;

  /**
   * Check whether dir is traversable or not.
   *
   * @param inode
   *          Dir inode
   * @return true if dir is traversable otherwise false.
   * @throws IOException
   */
  protected abstract boolean canTraverseDir(INode inode) throws IOException;

  /**
   * Class will represent the additional info required for traverse.
   */
  public static class TraverseInfo {

  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞