hadoop FSDirStatAndListingOp 源码

  • 2022-10-20
  • 浏览 (181)

haddop FSDirStatAndListingOp 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;

import static org.apache.hadoop.util.Time.now;

class FSDirStatAndListingOp {
  static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc,
      final String srcArg, byte[] startAfter, boolean needLocation)
      throws IOException {
    final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);

    // Get file name when startAfter is an INodePath.  This is not the
    // common case so avoid any unnecessary processing unless required.
    if (startAfter.length > 0 && startAfter[0] == Path.SEPARATOR_CHAR) {
      final String startAfterString = DFSUtil.bytes2String(startAfter);
      if (FSDirectory.isReservedName(startAfterString)) {
        try {
          byte[][] components = INode.getPathComponents(startAfterString);
          components = FSDirectory.resolveComponents(components, fsd);
          startAfter = components[components.length - 1];
        } catch (IOException e) {
          // Possibly the inode is deleted
          throw new DirectoryListingStartAfterNotFoundException(
              "Can't find startAfter " + startAfterString);
        }
      }
    }

    if (fsd.isPermissionEnabled()) {
      if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
        fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
      }
    }
    return getListing(fsd, iip, startAfter, needLocation);
  }

  /**
   * Get the file info for a specific file.
   * @param fsd The FS directory
   * @param pc The permission checker
   * @param srcArg The string representation of the path to the file
   * @param resolveLink whether to throw UnresolvedLinkException
   *        if src refers to a symlink
   *
   * @param needLocation Include {@link LocatedBlocks} in result.
   * @param needBlockToken Include block tokens in {@link LocatedBlocks}.
   * @return object containing information regarding the file
   *         or null if file not found
   */
  static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
      String srcArg, boolean resolveLink, boolean needLocation,
      boolean needBlockToken) throws IOException {
    DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
    final INodesInPath iip;
    if (pc.isSuperUser()) {
      // superuser can only get an ACE if an existing ancestor is a file.
      // right or (almost certainly) wrong, current fs contracts expect
      // superuser to receive null instead.
      try {
        iip = fsd.resolvePath(pc, srcArg, dirOp);
        pc.checkSuperuserPrivilege(iip.getPath());
      } catch (AccessControlException ace) {
        return null;
      }
    } else {
      iip = fsd.resolvePath(pc, srcArg, dirOp);
    }
    return getFileInfo(fsd, iip, needLocation, needBlockToken);
  }

  /**
   * Returns true if the file is closed
   */
  static boolean isFileClosed(FSDirectory fsd, FSPermissionChecker pc,
      String src) throws IOException {
    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
    return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
  }

  static ContentSummary getContentSummary(
      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
    if (fsd.isPermissionEnabled() && fsd.isPermissionContentSummarySubAccess()) {
      fsd.checkPermission(pc, iip, false, null, null, null,
          FsAction.READ_EXECUTE);
      pc = null;
    }
    // getContentSummaryInt() call will check access (if enabled) when
    // traversing all sub directories.
    return getContentSummaryInt(fsd, pc, iip);
  }

  /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   * @throws IOException
   */
  static GetBlockLocationsResult getBlockLocations(
      FSDirectory fsd, FSPermissionChecker pc, String src, long offset,
      long length, boolean needBlockToken) throws IOException {
    Preconditions.checkArgument(offset >= 0,
        "Negative offset is not supported. File: " + src);
    Preconditions.checkArgument(length >= 0,
        "Negative length is not supported. File: " + src);
    BlockManager bm = fsd.getBlockManager();
    fsd.readLock();
    try {
      // Just get INodesInPath without access checks, since we check for path
      // access later
      final INodesInPath iip = fsd.resolvePath(null, src, DirOp.READ);
      src = iip.getPath();
      final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
      if (fsd.isPermissionEnabled()) {
        fsd.checkUnreadableBySuperuser(pc, iip);
        fsd.checkPathAccess(pc, iip, FsAction.READ);
      }

      final long fileSize = iip.isSnapshot()
          ? inode.computeFileSize(iip.getPathSnapshotId())
          : inode.computeFileSizeNotIncludingLastUcBlock();

      boolean isUc = inode.isUnderConstruction();
      if (iip.isSnapshot()) {
        // if src indicates a snapshot file, we need to make sure the returned
        // blocks do not exceed the size of the snapshot file.
        length = Math.min(length, fileSize - offset);
        isUc = false;
      }

      final FileEncryptionInfo feInfo =
          FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, iip);
      final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
          unprotectedGetErasureCodingPolicy(fsd.getFSNamesystem(), iip);

      final LocatedBlocks blocks = bm.createLocatedBlocks(
          inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
          length, needBlockToken, iip.isSnapshot(), feInfo, ecPolicy);

      final long now = now();
      boolean updateAccessTime = fsd.isAccessTimeSupported()
          && !iip.isSnapshot()
          && now > inode.getAccessTime() + fsd.getAccessTimePrecision();
      return new GetBlockLocationsResult(updateAccessTime, blocks, iip);
    } finally {
      fsd.readUnlock();
    }
  }

  private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
    return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
        ? inodePolicy : parentPolicy;
  }

  /**
   * Get a partial listing of the indicated directory
   *
   * We will stop when any of the following conditions is met:
   * 1) this.lsLimit files have been added
   * 2) needLocation is true AND enough files have been added such
   * that at least this.lsLimit block locations are in the response
   *
   * @param fsd FSDirectory
   * @param iip the INodesInPath instance containing all the INodes along the
   *            path
   * @param startAfter the name to start listing after
   * @param needLocation if block locations are returned
   * @return a partial listing starting after startAfter
   */
  private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
      byte[] startAfter, boolean needLocation)
      throws IOException {
    if (FSDirectory.isExactReservedName(iip.getPathComponents())) {
      return getReservedListing(fsd);
    }

    fsd.readLock();
    try {
      if (iip.isDotSnapshotDir()) {
        return getSnapshotsListing(fsd, iip, startAfter);
      }
      final int snapshot = iip.getPathSnapshotId();
      final INode targetNode = iip.getLastINode();
      if (targetNode == null) {
        return null;
      }

      byte parentStoragePolicy = targetNode.getStoragePolicyID();

      if (!targetNode.isDirectory()) {
        // return the file's status. note that the iip already includes the
        // target INode
        return new DirectoryListing(
            new HdfsFileStatus[]{ createFileStatus(
                fsd, iip, null, parentStoragePolicy, needLocation, false)
            }, 0);
      }

      final INodeDirectory dirInode = targetNode.asDirectory();
      final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
      int startChild = INodeDirectory.nextChild(contents, startAfter);
      int totalNumChildren = contents.size();
      int numOfListing = Math.min(totalNumChildren - startChild,
          fsd.getLsLimit());
      int locationBudget = fsd.getLsLimit();
      int listingCnt = 0;
      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
      for (int i = 0; i < numOfListing && locationBudget > 0; i++) {
        INode child = contents.get(startChild+i);
        byte childStoragePolicy =
            !child.isSymlink()
                ? getStoragePolicyID(child.getLocalStoragePolicyID(),
                    parentStoragePolicy)
            : parentStoragePolicy;
        listing[i] = createFileStatus(fsd, iip, child, childStoragePolicy,
            needLocation, false);
        listingCnt++;
        if (listing[i] instanceof HdfsLocatedFileStatus) {
            // Once we  hit lsLimit locations, stop.
            // This helps to prevent excessively large response payloads.
            // Approximate #locations with locatedBlockCount() * repl_factor
            LocatedBlocks blks =
                ((HdfsLocatedFileStatus)listing[i]).getLocatedBlocks();
            locationBudget -= (blks == null) ? 0 :
               blks.locatedBlockCount() * listing[i].getReplication();
        }
      }
      // truncate return array if necessary
      if (listingCnt < numOfListing) {
          listing = Arrays.copyOf(listing, listingCnt);
      }
      return new DirectoryListing(
          listing, totalNumChildren-startChild-listingCnt);
    } finally {
      fsd.readUnlock();
    }
  }

  /**
   * Get a listing of all the snapshots of a snapshottable directory
   */
  private static DirectoryListing getSnapshotsListing(
      FSDirectory fsd, INodesInPath iip, byte[] startAfter)
      throws IOException {
    Preconditions.checkState(fsd.hasReadLock());
    Preconditions.checkArgument(iip.isDotSnapshotDir(),
        "%s does not end with %s",
        iip.getPath(), HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
    // drop off the null .snapshot component
    iip = iip.getParentINodesInPath();
    final String dirPath = iip.getPath();
    final INode node = iip.getLastINode();
    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
    if (sf == null) {
      throw new SnapshotException(
          "Directory is not a snapshottable directory: " + dirPath);
    }
    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
    int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
    skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
    int numOfListing = Math.min(snapshots.size() - skipSize, fsd.getLsLimit());
    final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
    for (int i = 0; i < numOfListing; i++) {
      Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
      listing[i] = createFileStatus(fsd, iip, sRoot,
          HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, false, false);
    }
    return new DirectoryListing(
        listing, snapshots.size() - skipSize - numOfListing);
  }

  /**
   * Get a listing of the /.reserved directory.
   * @param fsd FSDirectory
   * @return listing containing child directories of /.reserved
   */
  private static DirectoryListing getReservedListing(FSDirectory fsd) {
    return new DirectoryListing(fsd.getReservedStatuses(), 0);
  }

  /** Get the file info for a specific file.
   * @param fsd FSDirectory
   * @param iip The path to the file, the file is included
   * @param includeStoragePolicy whether to include storage policy
   * @param needLocation Include {@link LocatedBlocks} in response
   * @param needBlockToken Generate block tokens for {@link LocatedBlocks}
   * @return object containing information regarding the file
   *         or null if file not found
   */
  static HdfsFileStatus getFileInfo(FSDirectory fsd, INodesInPath iip,
      boolean includeStoragePolicy, boolean needLocation,
      boolean needBlockToken) throws IOException {
    fsd.readLock();
    try {
      final INode node = iip.getLastINode();
      if (node == null) {
        return null;
      }
      byte policy = (includeStoragePolicy && !node.isSymlink())
          ? node.getStoragePolicyID()
          : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
      return createFileStatus(fsd, iip, null, policy, needLocation,
          needBlockToken);
    } finally {
      fsd.readUnlock();
    }
  }

  static HdfsFileStatus getFileInfo(FSDirectory fsd, INodesInPath iip,
      boolean needLocation, boolean needBlockToken) throws IOException {
    fsd.readLock();
    try {
      HdfsFileStatus status = null;
      if (FSDirectory.isExactReservedName(iip.getPathComponents())) {
        status = FSDirectory.DOT_RESERVED_STATUS;
      } else if (iip.isDotSnapshotDir()) {
        if (fsd.getINode4DotSnapshot(iip) != null) {
          status = FSDirectory.DOT_SNAPSHOT_DIR_STATUS;
        }
      } else {
        status = getFileInfo(fsd, iip, true, needLocation, needBlockToken);
      }
      return status;
    } finally {
      fsd.readUnlock();
    }
  }

  /**
   * create a hdfs file status from an iip.
   * @param fsd FSDirectory
   * @param iip The INodesInPath containing the INodeFile and its ancestors
   * @return HdfsFileStatus without locations or storage policy
   */
  static HdfsFileStatus createFileStatusForEditLog(
      FSDirectory fsd, INodesInPath iip) throws IOException {
    return createFileStatus(fsd, iip,
        null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, false, false);
  }

  /**
   * create a hdfs file status from an iip.
   *
   * @param fsd FSDirectory
   * @param iip The INodesInPath containing the INodeFile and its ancestors.
   * @param child for a directory listing of the iip, else null
   * @param storagePolicy for the path or closest ancestor
   * @param needLocation if block locations need to be included or not
   * @param needBlockToken
   * @return a file status
   * @throws java.io.IOException if any error occurs
   */
  private static HdfsFileStatus createFileStatus(
      FSDirectory fsd, INodesInPath iip, INode child, byte storagePolicy,
      boolean needLocation, boolean needBlockToken) throws IOException {
    assert fsd.hasReadLock();
    // only directory listing sets the status name.
    byte[] name = HdfsFileStatus.EMPTY_NAME;
    if (child != null) {
      name = child.getLocalNameBytes();
      // have to do this for EC and EZ lookups...
      iip = INodesInPath.append(iip, child, name);
    }

    long size = 0;     // length is zero for directories
    short replication = 0;
    long blocksize = 0;
    final INode node = iip.getLastINode();
    final int snapshot = iip.getPathSnapshotId();
    LocatedBlocks loc = null;

    final boolean isEncrypted = FSDirEncryptionZoneOp.isInAnEZ(fsd, iip);
    FileEncryptionInfo feInfo = null;

    final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
        .unprotectedGetErasureCodingPolicy(fsd.getFSNamesystem(), iip);
    final boolean isErasureCoded = (ecPolicy != null);

    boolean isSnapShottable = false;

    if (node.isFile()) {
      final INodeFile fileNode = node.asFile();
      size = fileNode.computeFileSize(snapshot);
      replication = fileNode.getFileReplication(snapshot);
      blocksize = fileNode.getPreferredBlockSize();
      if (isEncrypted) {
        feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, iip);
      }
      if (needLocation) {
        final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
        final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
        final long fileSize = !inSnapshot && isUc
            ? fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
        loc = fsd.getBlockManager().createLocatedBlocks(
            fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size,
            needBlockToken, inSnapshot, feInfo, ecPolicy);
        if (loc == null) {
          loc = new LocatedBlocks();
        }
      }
    } else if (node.isDirectory()) {
      isSnapShottable = node.asDirectory().isSnapshottable();
    }

    int childrenNum = node.isDirectory() ?
        node.asDirectory().getChildrenNum(snapshot) : 0;

    INodeAttributes nodeAttrs = fsd.getAttributes(iip);
    boolean hasAcl = nodeAttrs.getAclFeature() != null;

    EnumSet<HdfsFileStatus.Flags> flags =
        DFSUtil.getFlags(isEncrypted, isErasureCoded, isSnapShottable, hasAcl);

    return createFileStatus(
        size,
        node.isDirectory(),
        replication,
        blocksize,
        node.getModificationTime(snapshot),
        node.getAccessTime(snapshot),
        nodeAttrs.getFsPermission(),
        flags,
        nodeAttrs.getUserName(),
        nodeAttrs.getGroupName(),
        node.isSymlink() ? node.asSymlink().getSymlink() : null,
        name,
        node.getId(),
        childrenNum,
        feInfo,
        storagePolicy,
        ecPolicy,
        loc);
  }

  private static HdfsFileStatus createFileStatus(
      long length, boolean isdir,
      int replication, long blocksize, long mtime, long atime,
      FsPermission permission, EnumSet<HdfsFileStatus.Flags> flags,
      String owner, String group, byte[] symlink, byte[] path, long fileId,
      int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy,
      ErasureCodingPolicy ecPolicy, LocatedBlocks locations) {
    return new HdfsFileStatus.Builder()
        .length(length)
        .isdir(isdir)
        .replication(replication)
        .blocksize(blocksize)
        .mtime(mtime)
        .atime(atime)
        .perm(permission)
        .flags(flags)
        .owner(owner)
        .group(group)
        .symlink(symlink)
        .path(path)
        .fileId(fileId)
        .children(childrenNum)
        .feInfo(feInfo)
        .storagePolicy(storagePolicy)
        .ecPolicy(ecPolicy)
        .locations(locations)
        .build();
  }

  private static ContentSummary getContentSummaryInt(FSDirectory fsd,
      FSPermissionChecker pc, INodesInPath iip) throws IOException {
    fsd.readLock();
    try {
      INode targetNode = iip.getLastINode();
      if (targetNode == null) {
        throw new FileNotFoundException("File does not exist: " + iip.getPath());
      }
      else {
        // Make it relinquish locks everytime contentCountLimit entries are
        // processed. 0 means disabled. I.e. blocking for the entire duration.
        ContentSummaryComputationContext cscc =
            new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
                fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(), pc);
        ContentSummary cs = targetNode.computeAndConvertContentSummary(
            iip.getPathSnapshotId(), cscc);
        fsd.addYieldCount(cscc.getYieldCount());
        return cs;
      }
    } finally {
      fsd.readUnlock();
    }
  }

  static QuotaUsage getQuotaUsage(
      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
    final INodesInPath iip;
    fsd.readLock();
    try {
      iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
      if (fsd.isPermissionEnabled()) {
        fsd.checkPermission(pc, iip, false, null, null, null,
            FsAction.READ_EXECUTE);
      }
    } finally {
      fsd.readUnlock();
    }
    QuotaUsage usage = getQuotaUsageInt(fsd, iip);
    if (usage != null) {
      return usage;
    } else {
      //If quota isn't set, fall back to getContentSummary.
      return getContentSummaryInt(fsd, pc, iip);
    }
  }

  private static QuotaUsage getQuotaUsageInt(FSDirectory fsd, INodesInPath iip)
    throws IOException {
    fsd.readLock();
    try {
      INode targetNode = iip.getLastINode();
      if (targetNode == null) {
        throw new FileNotFoundException(
            "File/Directory does not exist: " + iip.getPath());
      }
      QuotaUsage usage = null;
      if (targetNode.isDirectory()) {
        DirectoryWithQuotaFeature feature =
            targetNode.asDirectory().getDirectoryWithQuotaFeature();
        if (feature != null) {
          QuotaCounts counts = feature.getSpaceConsumed();
          QuotaCounts quotas = feature.getQuota();
          usage = new QuotaUsage.Builder().
              fileAndDirectoryCount(counts.getNameSpace()).
              quota(quotas.getNameSpace()).
              spaceConsumed(counts.getStorageSpace()).
              spaceQuota(quotas.getStorageSpace()).
              typeConsumed(counts.getTypeSpaces().asArray()).
              typeQuota(quotas.getTypeSpaces().asArray()).build();
        }
      }
      return usage;
    } finally {
      fsd.readUnlock();
    }
  }

  static class GetBlockLocationsResult {
    final boolean updateAccessTime;
    final LocatedBlocks blocks;
    private final INodesInPath iip;
    boolean updateAccessTime() {
      return updateAccessTime;
    }
    public INodesInPath getIIp() {
      return iip;
    }
    private GetBlockLocationsResult(
        boolean updateAccessTime, LocatedBlocks blocks, INodesInPath iip) {
      this.updateAccessTime = updateAccessTime;
      this.blocks = blocks;
      this.iip = iip;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞