hadoop FSDirConcatOp 源码

  • 2022-10-20
  • 浏览 (228)

haddop FSDirConcatOp 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.List;

import static org.apache.hadoop.util.Time.now;

/**
 * Restrictions for a concat operation:
 * <pre>
 * 1. the src file and the target file are in the same dir
 * 2. all the source files are not in snapshot
 * 3. any source file cannot be the same with the target file
 * 4. source files cannot be under construction or empty
 * 5. source file's preferred block size cannot be greater than the target file
 * </pre>
 */
class FSDirConcatOp {

  static FileStatus concat(FSDirectory fsd, FSPermissionChecker pc,
      String target, String[] srcs, boolean logRetryCache) throws IOException {
    validatePath(target, srcs);
    assert srcs != null;
    NameNode.stateChangeLog.debug("DIR* NameSystem.concat: {} to {}",
        Arrays.toString(srcs), target);

    final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
    // write permission for the target
    if (fsd.isPermissionEnabled()) {
      fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
    }

    // check the target
    verifyTargetFile(fsd, target, targetIIP);
    // check the srcs
    INodeFile[] srcFiles = verifySrcFiles(fsd, srcs, targetIIP, pc);

    long timestamp = now();
    fsd.writeLock();
    try {
      unprotectedConcat(fsd, targetIIP, srcFiles, timestamp);
    } finally {
      fsd.writeUnlock();
    }
    fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
    return fsd.getAuditFileInfo(targetIIP);
  }

  private static void validatePath(String target, String[] srcs)
      throws IOException {
    Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
    Preconditions.checkArgument(srcs != null && srcs.length > 0,
        "No sources given");
    if (FSDirectory.isReservedRawName(target)
        || FSDirectory.isReservedInodesName(target)) {
      throw new IOException("Concat operation doesn't support "
          + FSDirectory.DOT_RESERVED_STRING + " relative path : " + target);
    }
    for (String srcPath : srcs) {
      if (FSDirectory.isReservedRawName(srcPath)
          || FSDirectory.isReservedInodesName(srcPath)) {
        throw new IOException("Concat operation doesn't support "
            + FSDirectory.DOT_RESERVED_STRING + " relative path : " + srcPath);
      }
    }
  }

  private static void verifyTargetFile(FSDirectory fsd, final String target,
      final INodesInPath targetIIP) throws IOException {
    // check the target
    if (FSDirEncryptionZoneOp.getEZForPath(fsd, targetIIP) != null) {
      throw new HadoopIllegalArgumentException(
          "concat can not be called for files in an encryption zone.");
    }
    final INodeFile targetINode = INodeFile.valueOf(targetIIP.getLastINode(),
        target);
    if(targetINode.isUnderConstruction()) {
      throw new HadoopIllegalArgumentException("concat: target file "
          + target + " is under construction");
    }
  }

  private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
      INodesInPath targetIIP, FSPermissionChecker pc) throws IOException {
    // to make sure no two files are the same
    Set<INodeFile> si = new LinkedHashSet<>();
    final INodeFile targetINode = targetIIP.getLastINode().asFile();
    final INodeDirectory targetParent = targetINode.getParent();
    // now check the srcs
    for(String src : srcs) {
      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
      // permission check for srcs
      if (pc != null) {
        fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
        fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
      }
      final INode srcINode = iip.getLastINode();
      final INodeFile srcINodeFile = INodeFile.valueOf(srcINode, src);
      // make sure the src file and the target file are in the same dir
      if (srcINodeFile.getParent() != targetParent) {
        throw new HadoopIllegalArgumentException("Source file " + src
            + " is not in the same directory with the target "
            + targetIIP.getPath());
      }
      // make sure all the source files are not in snapshot
      if (srcINode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
        throw new SnapshotException("Concat: the source file " + src
            + " is in snapshot");
      }
      // check if the file has other references.
      if (srcINode.isReference() && ((INodeReference.WithCount)
          srcINode.asReference().getReferredINode()).getReferenceCount() > 1) {
        throw new SnapshotException("Concat: the source file " + src
            + " is referred by some other reference in some snapshot.");
      }
      // source file cannot be the same with the target file
      if (srcINode.equals(targetINode)) {
        throw new HadoopIllegalArgumentException("concat: the src file " + src
            + " is the same with the target file " + targetIIP.getPath());
      }
      // source file cannot be under construction or empty
      if(srcINodeFile.isUnderConstruction() || srcINodeFile.numBlocks() == 0) {
        throw new HadoopIllegalArgumentException("concat: source file " + src
            + " is invalid or empty or underConstruction");
      }

      // source file's preferred block size cannot be greater than the target
      // file
      if (srcINodeFile.getPreferredBlockSize() >
          targetINode.getPreferredBlockSize()) {
        throw new HadoopIllegalArgumentException("concat: source file " + src
            + " has preferred block size " + srcINodeFile.getPreferredBlockSize()
            + " which is greater than the target file's preferred block size "
            + targetINode.getPreferredBlockSize());
      }
      if(srcINodeFile.getErasureCodingPolicyID() !=
          targetINode.getErasureCodingPolicyID()) {
        throw new HadoopIllegalArgumentException("Source file " + src
            + " and target file " + targetIIP.getPath()
            + " have different erasure coding policy");
      }
      si.add(srcINodeFile);
    }

    // make sure no two files are the same
    if(si.size() < srcs.length) {
      // it means at least two files are the same
      throw new HadoopIllegalArgumentException(
          "concat: at least two of the source files are the same");
    }
    return si.toArray(new INodeFile[si.size()]);
  }

  private static QuotaCounts computeQuotaDeltas(FSDirectory fsd,
      INodeFile target, INodeFile[] srcList) {
    QuotaCounts deltas = new QuotaCounts.Builder().build();
    final short targetRepl = target.getPreferredBlockReplication();
    for (INodeFile src : srcList) {
      short srcRepl = src.getFileReplication();
      long fileSize = src.computeFileSize();
      if (targetRepl != srcRepl) {
        deltas.addStorageSpace(fileSize * (targetRepl - srcRepl));
        BlockStoragePolicy bsp =
            fsd.getBlockStoragePolicySuite().getPolicy(src.getStoragePolicyID());
        if (bsp != null) {
          List<StorageType> srcTypeChosen = bsp.chooseStorageTypes(srcRepl);
          for (StorageType t : srcTypeChosen) {
            if (t.supportTypeQuota()) {
              deltas.addTypeSpace(t, -fileSize);
            }
          }
          List<StorageType> targetTypeChosen = bsp.chooseStorageTypes(targetRepl);
          for (StorageType t : targetTypeChosen) {
            if (t.supportTypeQuota()) {
              deltas.addTypeSpace(t, fileSize);
            }
          }
        }
      }
    }
    deltas.addNameSpace(-srcList.length);
    return deltas;
  }

  private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
      QuotaCounts deltas) throws QuotaExceededException {
    if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
      // Do not check quota if editlog is still being processed
      return;
    }
    FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, deltas, null);
  }

  /**
   * Concat all the blocks from srcs to trg and delete the srcs files
   * @param fsd FSDirectory
   */
  static void unprotectedConcat(FSDirectory fsd, INodesInPath targetIIP,
      INodeFile[] srcList, long timestamp) throws IOException {
    assert fsd.hasWriteLock();
    NameNode.stateChangeLog.debug("DIR* NameSystem.concat to {}",
        targetIIP.getPath());

    final INodeFile trgInode = targetIIP.getLastINode().asFile();
    QuotaCounts deltas = computeQuotaDeltas(fsd, trgInode, srcList);
    verifyQuota(fsd, targetIIP, deltas);

    // the target file can be included in a snapshot
    trgInode.recordModification(targetIIP.getLatestSnapshotId());
    INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
    trgInode.concatBlocks(srcList, fsd.getBlockManager());

    // since we are in the same dir - we can use same parent to remove files
    int count = 0;
    for (INodeFile nodeToRemove : srcList) {
      if(nodeToRemove != null) {
        nodeToRemove.clearBlocks();
        // Ensure the nodeToRemove is cleared from snapshot diff list
        nodeToRemove.getParent().removeChild(nodeToRemove,
            targetIIP.getLatestSnapshotId());
        fsd.getINodeMap().remove(nodeToRemove);
        count++;
      }
    }

    trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
    trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
    // update quota on the parent directory with deltas
    FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1, deltas);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞