hadoop FsVolumeImpl 源码

  • 2022-10-20
  • 浏览 (252)

haddop FsVolumeImpl 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * The underlying volume used to store replica.
 *
 * It uses the {@link FsDatasetImpl} object for synchronization.
 */
@InterfaceAudience.Private
@VisibleForTesting
public class FsVolumeImpl implements FsVolumeSpi {
  public static final Logger LOG =
      LoggerFactory.getLogger(FsVolumeImpl.class);
  private static final ObjectWriter WRITER =
      new ObjectMapper().writerWithDefaultPrettyPrinter();
  private static final ObjectReader READER =
      new ObjectMapper().readerFor(BlockIteratorState.class);

  private final FsDatasetImpl dataset;
  private final String storageID;
  private final StorageType storageType;
  private final Map<String, BlockPoolSlice> bpSlices
      = new ConcurrentHashMap<String, BlockPoolSlice>();

  // Refers to the base StorageLocation used to construct this volume
  // (i.e., does not include STORAGE_DIR_CURRENT in
  // <location>/STORAGE_DIR_CURRENT/)
  private final StorageLocation storageLocation;

  private final File currentDir;    // <StorageDirectory>/current
  private final DF usage;
  private final ReservedSpaceCalculator reserved;
  private long cachedCapacity;
  private CloseableReferenceCount reference = new CloseableReferenceCount();

  // Disk space reserved for blocks (RBW or Re-replicating) open for write.
  private AtomicLong reservedForReplicas;
  private long recentReserved = 0;
  private final Configuration conf;
  // Capacity configured. This is useful when we want to
  // limit the visible capacity for tests. If negative, then we just
  // query from the filesystem.
  protected volatile long configuredCapacity;
  private final FileIoProvider fileIoProvider;
  private final DataNodeVolumeMetrics metrics;
  private URI baseURI;
  private boolean enableSameDiskTiering;
  private final String mount;
  private double reservedForArchive;

  /**
   * Per-volume worker pool that processes new blocks to cache.
   * The maximum number of workers per volume is bounded (configurable via
   * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
   * contention.
   */
  protected ThreadPoolExecutor cacheExecutor;

  FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
      FileIoProvider fileIoProvider, Configuration conf) throws IOException {
    // outside tests, usage created in ReservedSpaceCalculator.Builder
    this(dataset, storageID, sd, fileIoProvider, conf, null);
  }

  FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
      FileIoProvider fileIoProvider, Configuration conf, DF usage)
      throws IOException {

    if (sd.getStorageLocation() == null) {
      throw new IOException("StorageLocation specified for storage directory " +
          sd + " is null");
    }
    this.dataset = dataset;
    this.storageID = storageID;
    this.reservedForReplicas = new AtomicLong(0L);
    this.storageLocation = sd.getStorageLocation();
    this.currentDir = sd.getCurrentDir();
    this.storageType = storageLocation.getStorageType();
    this.configuredCapacity = -1;
    this.usage = usage;
    if (this.usage != null) {
      reserved = new ReservedSpaceCalculator.Builder(conf)
          .setUsage(this.usage).setStorageType(storageType).build();
      boolean fixedSizeVolume = conf.getBoolean(
          DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_KEY,
          DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT);
      if (fixedSizeVolume) {
        cachedCapacity = this.usage.getCapacity();
      }
    } else {
      reserved = null;
      LOG.warn("Setting reserved to null as usage is null");
      cachedCapacity = -1;
    }
    if (currentDir != null) {
      File parent = currentDir.getParentFile();
      cacheExecutor = initializeCacheExecutor(parent);
      this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath());
      this.baseURI = new File(currentDir.getParent()).toURI();
    } else {
      cacheExecutor = null;
      this.metrics = null;
    }
    this.conf = conf;
    this.fileIoProvider = fileIoProvider;
    this.enableSameDiskTiering =
        conf.getBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
            DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
    if (enableSameDiskTiering && usage != null) {
      this.mount = usage.getMount();
    } else {
      mount = "";
    }
  }

  String getMount() {
    return mount;
  }

  protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
    if (storageType.isRAM()) {
      return null;
    }
    if (dataset.datanode == null) {
      // FsVolumeImpl is used in test.
      return null;
    }

    final int maxNumThreads = dataset.datanode.getConf().getInt(
        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);

    String escapedPath = parent.toString().replaceAll("%", "%%");
    ThreadFactory workerFactory = new ThreadFactoryBuilder()
        .setDaemon(true)
        .setNameFormat("FsVolumeImplWorker-" + escapedPath + "-%d")
        .build();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        1, maxNumThreads,
        60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(),
        workerFactory);
    executor.allowCoreThreadTimeOut(true);
    return executor;
  }

  private void printReferenceTraceInfo(String op) {
    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
    for (StackTraceElement ste : stack) {
      switch (ste.getMethodName()) {
      case "getDfsUsed":
      case "getBlockPoolUsed":
      case "getAvailable":
      case "getVolumeMap":
        return;
      default:
        break;
      }
    }
    FsDatasetImpl.LOG.trace("Reference count: " + op + " " + this + ": " +
        this.reference.getReferenceCount());
    FsDatasetImpl.LOG.trace(
        Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
  }

  /**
   * Increase the reference count. The caller must increase the reference count
   * before issuing IOs.
   *
   * @throws IOException if the volume is already closed.
   */
  private void reference() throws ClosedChannelException {
    this.reference.reference();
    if (FsDatasetImpl.LOG.isTraceEnabled()) {
      printReferenceTraceInfo("incr");
    }
  }

  /**
   * Decrease the reference count.
   */
  private void unreference() {
    if (FsDatasetImpl.LOG.isTraceEnabled()) {
      printReferenceTraceInfo("desc");
    }
    if (FsDatasetImpl.LOG.isDebugEnabled()) {
      if (reference.getReferenceCount() <= 0) {
        FsDatasetImpl.LOG.debug("Decrease reference count <= 0 on " + this +
          Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
      }
    }
    checkReference();
    this.reference.unreference();
  }

  private static class FsVolumeReferenceImpl implements FsVolumeReference {
    private FsVolumeImpl volume;

    FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
      this.volume = volume;
      volume.reference();
    }

    /**
     * Decreases the reference count.
     * @throws IOException it never throws IOException.
     */
    @Override
    public void close() throws IOException {
      if (volume != null) {
        volume.unreference();
        volume = null;
      }
    }

    @Override
    public FsVolumeSpi getVolume() {
      return this.volume;
    }
  }

  @Override
  public FsVolumeReference obtainReference() throws ClosedChannelException {
    return new FsVolumeReferenceImpl(this);
  }

  private void checkReference() {
    Preconditions.checkState(reference.getReferenceCount() > 0);
  }

  @VisibleForTesting
  public int getReferenceCount() {
    return this.reference.getReferenceCount();
  }

  /**
   * Close this volume.
   * @throws IOException if the volume is closed.
   */
  void setClosed() throws IOException {
    try {
      this.reference.setClosed();
      dataset.stopAllDataxceiverThreads(this);
    } catch (ClosedChannelException e) {
      throw new IOException("The volume has already closed.", e);
    }
  }

  /**
   * Check whether this volume has successfully been closed.
   */
  boolean checkClosed() {
    if (this.reference.getReferenceCount() > 0) {
      FsDatasetImpl.LOG.debug("The reference count for {} is {}, wait to be 0.",
          this, reference.getReferenceCount());
      return false;
    }
    return true;
  }

  @VisibleForTesting
  File getCurrentDir() {
    return currentDir;
  }

  protected File getRbwDir(String bpid) throws IOException {
    return getBlockPoolSlice(bpid).getRbwDir();
  }

  protected File getLazyPersistDir(String bpid) throws IOException {
    return getBlockPoolSlice(bpid).getLazypersistDir();
  }

  protected File getTmpDir(String bpid) throws IOException {
    return getBlockPoolSlice(bpid).getTmpDir();
  }

  void onBlockFileDeletion(String bpid, long value) {
    decDfsUsedAndNumBlocks(bpid, value, true);
    if (isTransientStorage()) {
      dataset.releaseLockedMemory(value, true);
    }
  }

  void onMetaFileDeletion(String bpid, long value) {
    decDfsUsedAndNumBlocks(bpid, value, false);
  }

  private void decDfsUsedAndNumBlocks(String bpid, long value,
                                      boolean blockFileDeleted) {
    // BlockPoolSlice map is thread safe, and update the space used or
    // number of blocks are atomic operations, so it doesn't require to
    // hold the dataset lock.
    BlockPoolSlice bp = bpSlices.get(bpid);
    if (bp != null) {
      bp.decDfsUsed(value);
      if (blockFileDeleted) {
        bp.decrNumBlocks();
      }
    }
  }

  void incDfsUsedAndNumBlocks(String bpid, long value) {
    BlockPoolSlice bp = bpSlices.get(bpid);
    if (bp != null) {
      bp.incDfsUsed(value);
      bp.incrNumBlocks();
    }
  }

  void incDfsUsed(String bpid, long value) {
    BlockPoolSlice bp = bpSlices.get(bpid);
    if (bp != null) {
      bp.incDfsUsed(value);
    }
  }

  @VisibleForTesting
  public long getDfsUsed() throws IOException {
    long dfsUsed = 0;
    for (BlockPoolSlice s : bpSlices.values()) {
      dfsUsed += s.getDfsUsed();
    }
    return dfsUsed;
  }

  long getBlockPoolUsed(String bpid) throws IOException {
    return getBlockPoolSlice(bpid).getDfsUsed();
  }

  /**
   * Return either the configured capacity of the file system if configured; or
   * the capacity of the file system excluding space reserved for non-HDFS.
   *
   * When same-disk-tiering is turned on, the reported capacity
   * will take reservedForArchive value into consideration of.
   *
   * @return the unreserved number of bytes left in this filesystem. May be
   *         zero.
   */
  @VisibleForTesting
  public long getCapacity() {
    long capacity;
    if (configuredCapacity < 0L) {
      long remaining;
      if (cachedCapacity > 0L) {
        remaining = cachedCapacity - getReserved();
      } else {
        remaining = usage.getCapacity() - getReserved();
      }
      capacity = Math.max(remaining, 0L);
    } else {
      capacity = configuredCapacity;
    }

    if (enableSameDiskTiering && dataset.getMountVolumeMap() != null) {
      double capacityRatio = dataset.getMountVolumeMap()
          .getCapacityRatioByMountAndStorageType(mount, storageType);
      capacity = (long) (capacity * capacityRatio);
    }

    return capacity;
  }

  /**
   * This function MUST NOT be used outside of tests.
   *
   * @param capacity
   */
  @VisibleForTesting
  public void setCapacityForTesting(long capacity) {
    this.configuredCapacity = capacity;
  }

  /**
   * Calculate the available space of the filesystem, excluding space reserved
   * for non-HDFS and space reserved for RBW.
   *
   * @return the available number of bytes left in this filesystem. May be zero.
   */
  @Override
  public long getAvailable() throws IOException {
    long remaining = getCapacity() - getDfsUsed() - getReservedForReplicas();
    long available = usage.getAvailable()  - getRemainingReserved()
        - getReservedForReplicas();
    if (remaining > available) {
      remaining = available;
    }
    return Math.max(remaining, 0L);
  }

  long getActualNonDfsUsed() throws IOException {
    // DISK and ARCHIVAL on same disk
    // should share the same amount of reserved capacity.
    // When calculating actual non dfs used,
    // exclude DFS used capacity by another volume.
    if (enableSameDiskTiering
        && StorageType.allowSameDiskTiering(storageType)) {
      StorageType counterpartStorageType = storageType == StorageType.DISK
          ? StorageType.ARCHIVE : StorageType.DISK;
      FsVolumeReference counterpartRef = dataset
          .getMountVolumeMap()
          .getVolumeRefByMountAndStorageType(mount, counterpartStorageType);
      if (counterpartRef != null) {
        FsVolumeImpl counterpartVol = (FsVolumeImpl) counterpartRef.getVolume();
        long used = getDfUsed() - getDfsUsed() - counterpartVol.getDfsUsed();
        counterpartRef.close();
        return used;
      }
    }
    return getDfUsed() - getDfsUsed();
  }

  /**
   * This function is only used for Mock.
   */
  @VisibleForTesting
  public long getDfUsed() {
    return usage.getUsed();
  }

  private long getRemainingReserved() throws IOException {
    long actualNonDfsUsed = getActualNonDfsUsed();
    long actualReserved = getReserved();
    if (actualNonDfsUsed < actualReserved) {
      return actualReserved - actualNonDfsUsed;
    }
    return 0L;
  }

  /**
   * Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
   *
   * @return Disk usage excluding space used by HDFS and excluding space
   * reserved for blocks open for write.
   * @throws IOException
   */
  public long getNonDfsUsed() throws IOException {
    long actualNonDfsUsed = getActualNonDfsUsed();
    long actualReserved = getReserved();
    long nonDfsUsed = actualNonDfsUsed - actualReserved;
    return Math.max(nonDfsUsed, 0L);
  }

  @VisibleForTesting
  long getDfAvailable() {
    return usage.getAvailable();
  }

  @VisibleForTesting
  public long getReservedForReplicas() {
    return reservedForReplicas.get();
  }

  @VisibleForTesting
  long getRecentReserved() {
    return recentReserved;
  }

  public Map<String, BlockPoolSlice> getBlockPoolSlices() {
    return bpSlices;
  }

  long getReserved(){
    return reserved != null ? reserved.getReserved() : 0;
  }

  @VisibleForTesting
  BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
    BlockPoolSlice bp = bpSlices.get(bpid);
    if (bp == null) {
      throw new IOException("block pool " + bpid + " is not found");
    }
    return bp;
  }

  @Override
  public URI getBaseURI() {
    return baseURI;
  }

  @Override
  public DF getUsageStats(Configuration conf) {
    if (currentDir != null) {
      try {
        return new DF(new File(currentDir.getParent()), conf);
      } catch (IOException e) {
        LOG.error("Unable to get disk statistics for volume {}", this, e);
      }
    }
    return null;
  }

  @Override
  public StorageLocation getStorageLocation() {
    return storageLocation;
  }

  @Override
  public boolean isTransientStorage() {
    return storageType.isTransient();
  }

  @Override
  public boolean isRAMStorage() {
    return storageType.isRAM();
  }

  @VisibleForTesting
  public File getFinalizedDir(String bpid) throws IOException {
    return getBlockPoolSlice(bpid).getFinalizedDir();
  }

  /**
   * Make a deep copy of the list of currently active BPIDs.
   */
  @Override
  public String[] getBlockPoolList() {
    return bpSlices.keySet().toArray(new String[0]);
  }

  /**
   * Temporary files. They get moved to the finalized block directory when
   * the block is finalized.
   */
  File createTmpFile(String bpid, Block b) throws IOException {
    checkReference();
    reserveSpaceForReplica(b.getNumBytes());
    try {
      return getBlockPoolSlice(bpid).createTmpFile(b);
    } catch (IOException exception) {
      releaseReservedSpace(b.getNumBytes());
      throw exception;
    }
  }

  @Override
  public void reserveSpaceForReplica(long bytesToReserve) {
    if (bytesToReserve != 0L) {
      reservedForReplicas.addAndGet(bytesToReserve);
      recentReserved = bytesToReserve;
    }
  }

  @Override
  public void releaseReservedSpace(long bytesToRelease) {
    if (bytesToRelease != 0L) {
      long oldReservation, newReservation;
      do {
        oldReservation = reservedForReplicas.get();
        newReservation = oldReservation - bytesToRelease;

        // Fail-safe, this should never be less than zero in practice, but if it
        // does, do not advertise more space than is have available.
        newReservation = Math.max(newReservation, 0L);
      } while (!reservedForReplicas.compareAndSet(oldReservation,
          newReservation));
    }
  }

  @Override
  public void releaseLockedMemory(long bytesToRelease) {
    if (isTransientStorage()) {
      dataset.releaseLockedMemory(bytesToRelease, false);
    }
  }

  private enum SubdirFilter implements FilenameFilter {
    INSTANCE;

    @Override
    public boolean accept(File dir, String name) {
      return name.startsWith("subdir");
    }
  }

  private enum BlockFileFilter implements FilenameFilter {
    INSTANCE;

    @Override
    public boolean accept(File dir, String name) {
      return !name.endsWith(".meta") &&
              name.startsWith(Block.BLOCK_FILE_PREFIX);
    }
  }

  @VisibleForTesting
  public static String nextSorted(List<String> arr, String prev) {
    int res = 0;
    if (prev != null) {
      res = Collections.binarySearch(arr, prev);
      if (res < 0) {
        res = -1 - res;
      } else {
        res++;
      }
    }
    if (res >= arr.size()) {
      return null;
    }
    return arr.get(res);
  }

  private static class BlockIteratorState {
    BlockIteratorState() {
      lastSavedMs = iterStartMs = Time.now();
      curFinalizedDir = null;
      curFinalizedSubDir = null;
      curEntry = null;
      atEnd = false;
    }

    // The wall-clock ms since the epoch at which this iterator was last saved.
    @JsonProperty
    private long lastSavedMs;

    // The wall-clock ms since the epoch at which this iterator was created.
    @JsonProperty
    private long iterStartMs;

    @JsonProperty
    private String curFinalizedDir;

    @JsonProperty
    private String curFinalizedSubDir;

    @JsonProperty
    private String curEntry;

    @JsonProperty
    private boolean atEnd;
  }

  /**
   * A BlockIterator implementation for FsVolumeImpl.
   */
  private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
    private final File bpidDir;
    private final String name;
    private final String bpid;
    private long maxStalenessMs = 0;

    private List<String> cache;
    private long cacheMs;

    private BlockIteratorState state;

    BlockIteratorImpl(String bpid, String name) {
      this.bpidDir = new File(currentDir, bpid);
      this.name = name;
      this.bpid = bpid;
      rewind();
    }

    /**
     * Get the next subdirectory within the block pool slice.
     *
     * @return         The next subdirectory within the block pool slice, or
     *                   null if there are no more.
     */
    private String getNextSubDir(String prev, File dir)
          throws IOException {
      List<String> children = fileIoProvider.listDirectory(
          FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
      cache = null;
      cacheMs = 0;
      if (children.isEmpty()) {
        LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
            storageID, bpid, dir.getAbsolutePath());
        return null;
      }
      Collections.sort(children);
      String nextSubDir = nextSorted(children, prev);
      LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} within {}",
          storageID, bpid, nextSubDir, dir.getAbsolutePath());
      return nextSubDir;
    }

    private String getNextFinalizedDir() throws IOException {
      File dir = Paths.get(
          bpidDir.getAbsolutePath(), "current", "finalized").toFile();
      return getNextSubDir(state.curFinalizedDir, dir);
    }

    private String getNextFinalizedSubDir() throws IOException {
      if (state.curFinalizedDir == null) {
        return null;
      }
      File dir = Paths.get(
          bpidDir.getAbsolutePath(), "current", "finalized",
              state.curFinalizedDir).toFile();
      return getNextSubDir(state.curFinalizedSubDir, dir);
    }

    private List<String> getSubdirEntries() throws IOException {
      if (state.curFinalizedSubDir == null) {
        return null; // There are no entries in the null subdir.
      }
      long now = Time.monotonicNow();
      if (cache != null) {
        long delta = now - cacheMs;
        if (delta < maxStalenessMs) {
          return cache;
        } else {
          LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
            "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
          cache = null;
        }
      }
      File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
                    state.curFinalizedDir, state.curFinalizedSubDir).toFile();
      List<String> entries = fileIoProvider.listDirectory(
          FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
      if (entries.isEmpty()) {
        entries = null;
        LOG.trace("getSubdirEntries({}, {}): no entries found in {}", storageID,
            bpid, dir.getAbsolutePath());
      } else {
        Collections.sort(entries);
        LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
            storageID, bpid, entries.size(), dir.getAbsolutePath());
      }
      cache = entries;
      cacheMs = now;
      return cache;
    }

    /**
     * Get the next block.<p/>
     *
     * Each volume has a hierarchical structure.<p/>
     *
     * <code>
     * BPID B0
     *   finalized/
     *     subdir0
     *       subdir0
     *         blk_000
     *         blk_001
     *       ...
     *     subdir1
     *       subdir0
     *         ...
     *   rbw/
     * </code>
     *
     * When we run out of entries at one level of the structure, we search
     * progressively higher levels.  For example, when we run out of blk_
     * entries in a subdirectory, we search for the next subdirectory.
     * And so on.
     */
    @Override
    public ExtendedBlock nextBlock() throws IOException {
      if (state.atEnd) {
        return null;
      }
      try {
        while (true) {
          List<String> entries = getSubdirEntries();
          if (entries != null) {
            state.curEntry = nextSorted(entries, state.curEntry);
            if (state.curEntry == null) {
              LOG.trace("nextBlock({}, {}): advancing from {} to next " +
                  "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
            } else {
              ExtendedBlock block =
                  new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
              File expectedBlockDir = DatanodeUtil.idToBlockDir(
                  new File("."), block.getBlockId());
              File actualBlockDir = Paths.get(".",
                  state.curFinalizedDir, state.curFinalizedSubDir).toFile();
              if (!expectedBlockDir.equals(actualBlockDir)) {
                LOG.error("nextBlock({}, {}): block id {} found in invalid " +
                    "directory.  Expected directory: {}.  " +
                    "Actual directory: {}", storageID, bpid,
                    block.getBlockId(), expectedBlockDir.getPath(),
                    actualBlockDir.getPath());
                continue;
              }

              File blkFile = getBlockFile(bpid, block);
              File metaFile ;
              try {
                 metaFile = FsDatasetUtil.findMetaFile(blkFile);
              } catch (FileNotFoundException e){
                LOG.warn("nextBlock({}, {}): {}", storageID, bpid,
                    e.getMessage());
                continue;
              }

              block.setGenerationStamp(
                  Block.getGenerationStamp(metaFile.getName()));
              block.setNumBytes(blkFile.length());

              LOG.trace("nextBlock({}, {}): advancing to {}",
                  storageID, bpid, block);
              return block;
            }
          }
          state.curFinalizedSubDir = getNextFinalizedSubDir();
          if (state.curFinalizedSubDir == null) {
            state.curFinalizedDir = getNextFinalizedDir();
            if (state.curFinalizedDir == null) {
              state.atEnd = true;
              return null;
            }
          }
        }
      } catch (IOException e) {
        state.atEnd = true;
        LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
        throw e;
      }
    }

    private File getBlockFile(String bpid, ExtendedBlock blk)
        throws IOException {
      return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(bpid),
          blk.getBlockId()).toString() + "/" + blk.getBlockName());
    }

    @Override
    public boolean atEnd() {
      return state.atEnd;
    }

    @Override
    public void rewind() {
      cache = null;
      cacheMs = 0;
      state = new BlockIteratorState();
    }

    @Override
    public void save() throws IOException {
      state.lastSavedMs = Time.now();
      boolean success = false;
      try (BufferedWriter writer = new BufferedWriter(
          new OutputStreamWriter(fileIoProvider.getFileOutputStream(
              FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
        WRITER.writeValue(writer, state);
        success = true;
      } finally {
        if (!success) {
          fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile());
        }
      }
      fileIoProvider.move(FsVolumeImpl.this,
          getTempSaveFile().toPath(), getSaveFile().toPath(),
          StandardCopyOption.ATOMIC_MOVE);
      if (LOG.isTraceEnabled()) {
        LOG.trace("save({}, {}): saved {}", storageID, bpid,
            WRITER.writeValueAsString(state));
      }
    }

    public void load() throws IOException {
      File file = getSaveFile();
      this.state = READER.readValue(file);
      if (LOG.isTraceEnabled()) {
        LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
            bpid, name, file.getAbsoluteFile(),
            WRITER.writeValueAsString(state));
      }
    }

    File getSaveFile() {
      return new File(bpidDir, name + ".cursor");
    }

    File getTempSaveFile() {
      return new File(bpidDir, name + ".cursor.tmp");
    }

    @Override
    public void setMaxStalenessMs(long maxStalenessMs) {
      this.maxStalenessMs = maxStalenessMs;
    }

    @Override
    public void close() throws IOException {
      // No action needed for this volume implementation.
    }

    @Override
    public long getIterStartMs() {
      return state.iterStartMs;
    }

    @Override
    public long getLastSavedMs() {
      return state.lastSavedMs;
    }

    @Override
    public String getBlockPoolId() {
      return bpid;
    }
  }

  @Override
  public BlockIterator newBlockIterator(String bpid, String name) {
    return new BlockIteratorImpl(bpid, name);
  }

  @Override
  public BlockIterator loadBlockIterator(String bpid, String name)
      throws IOException {
    BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
    iter.load();
    return iter;
  }

  @Override
  public FsDatasetSpi<? extends FsVolumeSpi> getDataset() {
    return dataset;
  }

  /**
   * RBW files. They get moved to the finalized block directory when
   * the block is finalized.
   */
  File createRbwFile(String bpid, Block b) throws IOException {
    checkReference();
    reserveSpaceForReplica(b.getNumBytes());
    try {
      return getBlockPoolSlice(bpid).createRbwFile(b);
    } catch (IOException exception) {
      releaseReservedSpace(b.getNumBytes());
      throw exception;
    }
  }

  /**
   *
   * @param bytesReserved Space that was reserved during
   *     block creation. Now that the block is being finalized we
   *     can free up this space.
   * @return
   * @throws IOException
   */
  ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo,
      long bytesReserved) throws IOException {
    releaseReservedSpace(bytesReserved);
    File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
    final byte[] checksum;
    // copy the last partial checksum if the replica is originally
    // in finalized or rbw state.
    switch (replicaInfo.getState()) {
    case FINALIZED:
      FinalizedReplica finalized = (FinalizedReplica) replicaInfo;
      checksum = finalized.getLastPartialChunkChecksum();
      break;
    case RBW:
      ReplicaBeingWritten rbw = (ReplicaBeingWritten) replicaInfo;
      checksum = rbw.getLastChecksumAndDataLen().getChecksum();
      break;
    default:
      checksum = null;
      break;
    }

    return new ReplicaBuilder(ReplicaState.FINALIZED)
        .setBlock(replicaInfo)
        .setFsVolume(this)
        .setDirectoryToUse(dest.getParentFile())
        .setLastPartialChunkChecksum(checksum)
        .build();
  }

  Executor getCacheExecutor() {
    return cacheExecutor;
  }

  @Override
  public VolumeCheckResult check(VolumeCheckContext ignored)
      throws DiskErrorException {
    // TODO:FEDERATION valid synchronization
    for (BlockPoolSlice s : bpSlices.values()) {
      s.checkDirs();
    }
    return VolumeCheckResult.HEALTHY;
  }

  void getVolumeMap(ReplicaMap volumeMap,
      final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
    for (BlockPoolSlice s : bpSlices.values()) {
      s.getVolumeMap(volumeMap, ramDiskReplicaMap);
    }
  }

  void getVolumeMap(String bpid, ReplicaMap volumeMap,
      final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
    getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
  }

  long getNumBlocks() {
    long numBlocks = 0L;
    for (BlockPoolSlice s : bpSlices.values()) {
      numBlocks += s.getNumOfBlocks();
    }
    return numBlocks;
  }

  @Override
  public String toString() {
    return currentDir != null ? currentDir.getParent() : "NULL";
  }

  void shutdown() {
    if (cacheExecutor != null) {
      cacheExecutor.shutdown();
    }
    Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
    for (Entry<String, BlockPoolSlice> entry : set) {
      entry.getValue().shutdown(null);
    }
    if (metrics != null) {
      metrics.unRegister();
    }
  }

  void addBlockPool(String bpid, Configuration c) throws IOException {
    addBlockPool(bpid, c, null);
  }

  void addBlockPool(String bpid, Configuration c, Timer timer)
      throws IOException {
    File bpdir = new File(currentDir, bpid);
    BlockPoolSlice bp;
    if (timer == null) {
      timer = new Timer();
    }
    bp = new BlockPoolSlice(bpid, this, bpdir, c, timer);
    bpSlices.put(bpid, bp);
  }

  void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
    BlockPoolSlice bp = bpSlices.get(bpid);
    if (bp != null) {
      bp.shutdown(blocksListsAsLongs);
    }
    bpSlices.remove(bpid);
  }

  boolean isBPDirEmpty(String bpid) throws IOException {
    File volumeCurrentDir = this.getCurrentDir();
    File bpDir = new File(volumeCurrentDir, bpid);
    File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
    File finalizedDir = new File(bpCurrentDir,
        DataStorage.STORAGE_DIR_FINALIZED);
    File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
    if (fileIoProvider.exists(this, finalizedDir) &&
        !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) {
      return false;
    }
    if (fileIoProvider.exists(this, rbwDir) &&
        fileIoProvider.list(this, rbwDir).length != 0) {
      return false;
    }
    return true;
  }

  void deleteBPDirectories(String bpid, boolean force) throws IOException {
    File volumeCurrentDir = this.getCurrentDir();
    File bpDir = new File(volumeCurrentDir, bpid);
    if (!bpDir.isDirectory()) {
      // nothing to be deleted
      return;
    }
    File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
    File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
    File finalizedDir = new File(bpCurrentDir,
        DataStorage.STORAGE_DIR_FINALIZED);
    File lazypersistDir = new File(bpCurrentDir,
        DataStorage.STORAGE_DIR_LAZY_PERSIST);
    File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
    if (force) {
      fileIoProvider.fullyDelete(this, bpDir);
    } else {
      if (!fileIoProvider.delete(this, rbwDir)) {
        throw new IOException("Failed to delete " + rbwDir);
      }
      if (!DatanodeUtil.dirNoFilesRecursive(
              this, finalizedDir, fileIoProvider) ||
          !fileIoProvider.fullyDelete(
              this, finalizedDir)) {
        throw new IOException("Failed to delete " + finalizedDir);
      }
      if (lazypersistDir.exists() &&
          ((!DatanodeUtil.dirNoFilesRecursive(
              this, lazypersistDir, fileIoProvider) ||
              !fileIoProvider.fullyDelete(this, lazypersistDir)))) {
        throw new IOException("Failed to delete " + lazypersistDir);
      }
      fileIoProvider.fullyDelete(this, tmpDir);
      for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) {
        if (!fileIoProvider.delete(this, f)) {
          throw new IOException("Failed to delete " + f);
        }
      }
      if (!fileIoProvider.delete(this, bpCurrentDir)) {
        throw new IOException("Failed to delete " + bpCurrentDir);
      }
      for (File f : fileIoProvider.listFiles(this, bpDir)) {
        if (!fileIoProvider.delete(this, f)) {
          throw new IOException("Failed to delete " + f);
        }
      }
      if (!fileIoProvider.delete(this, bpDir)) {
        throw new IOException("Failed to delete " + bpDir);
      }
    }
  }

  @Override
  public String getStorageID() {
    return storageID;
  }

  @Override
  public StorageType getStorageType() {
    return storageType;
  }

  DatanodeStorage toDatanodeStorage() {
    return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
  }

  @Override
  public byte[] loadLastPartialChunkChecksum(
      File blockFile, File metaFile) throws IOException {
    // readHeader closes the temporary FileInputStream.
    DataChecksum dcs;
    try (FileInputStream fis = fileIoProvider.getFileInputStream(
        this, metaFile)) {
      dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
    }
    final int checksumSize = dcs.getChecksumSize();
    final long onDiskLen = blockFile.length();
    final int bytesPerChecksum = dcs.getBytesPerChecksum();

    if (onDiskLen % bytesPerChecksum == 0) {
      // the last chunk is a complete one. No need to preserve its checksum
      // because it will not be modified.
      return null;
    }

    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
        (onDiskLen / bytesPerChecksum) * checksumSize;
    byte[] lastChecksum = new byte[checksumSize];
    try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile(
        this, metaFile, "r")) {
      raf.seek(offsetInChecksum);
      int readBytes = raf.read(lastChecksum, 0, checksumSize);
      if (readBytes == -1) {
        throw new IOException("Expected to read " + checksumSize +
            " bytes from offset " + offsetInChecksum +
            " but reached end of file.");
      } else if (readBytes != checksumSize) {
        throw new IOException("Expected to read " + checksumSize +
            " bytes from offset " + offsetInChecksum + " but read " +
            readBytes + " bytes.");
      }
    }
    return lastChecksum;
  }

  public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
      long newGS, long estimateBlockLen) throws IOException {

    long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
    if (getAvailable() < bytesReserved) {
      throw new DiskOutOfSpaceException("Insufficient space for appending to "
          + replicaInfo);
    }

    assert replicaInfo.getVolume() == this:
      "The volume of the replica should be the same as this volume";

    // construct a RBW replica with the new GS
    File newBlkFile = new File(getRbwDir(bpid), replicaInfo.getBlockName());
    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
        .setBlockId(replicaInfo.getBlockId())
        .setLength(replicaInfo.getNumBytes())
        .setGenerationStamp(newGS)
        .setFsVolume(this)
        .setDirectoryToUse(newBlkFile.getParentFile())
        .setWriterThread(Thread.currentThread())
        .setBytesToReserve(bytesReserved)
        .buildLocalReplicaInPipeline();

    // Only a finalized replica can be appended.
    FinalizedReplica finalized = (FinalizedReplica)replicaInfo;
    // load last checksum and datalen
    newReplicaInfo.setLastChecksumAndDataLen(
        finalized.getVisibleLength(), finalized.getLastPartialChunkChecksum());

    // rename meta file to rbw directory
    // rename block file to rbw directory
    newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile);

    reserveSpaceForReplica(bytesReserved);
    return newReplicaInfo;
  }

  public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {

    File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
        .setBlockId(b.getBlockId())
        .setGenerationStamp(b.getGenerationStamp())
        .setFsVolume(this)
        .setDirectoryToUse(f.getParentFile())
        .setBytesToReserve(b.getNumBytes())
        .buildLocalReplicaInPipeline();
    return newReplicaInfo;
  }

  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
      ReplicaInfo temp) throws IOException {

    final long blockId = b.getBlockId();
    final long expectedGs = b.getGenerationStamp();
    final long visible = b.getNumBytes();
    final long numBytes = temp.getNumBytes();

    // move block files to the rbw directory
    BlockPoolSlice bpslice = getBlockPoolSlice(b.getBlockPoolId());
    final File dest = FsDatasetImpl.moveBlockFiles(b.getLocalBlock(), temp,
        bpslice.getRbwDir());
    // create RBW
    final LocalReplicaInPipeline rbw = new ReplicaBuilder(ReplicaState.RBW)
        .setBlockId(blockId)
        .setLength(numBytes)
        .setGenerationStamp(expectedGs)
        .setFsVolume(this)
        .setDirectoryToUse(dest.getParentFile())
        .setWriterThread(Thread.currentThread())
        .setBytesToReserve(0)
        .buildLocalReplicaInPipeline();
    rbw.setBytesAcked(visible);

    // load last checksum and datalen
    final File destMeta = FsDatasetUtil.getMetaFile(dest,
        b.getGenerationStamp());
    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
    rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
    return rbw;
  }

  public ReplicaInPipeline createTemporary(ExtendedBlock b) throws IOException {
    // create a temporary file to hold block in the designated volume
    File f = createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
    LocalReplicaInPipeline newReplicaInfo =
        new ReplicaBuilder(ReplicaState.TEMPORARY)
          .setBlockId(b.getBlockId())
          .setGenerationStamp(b.getGenerationStamp())
          .setDirectoryToUse(f.getParentFile())
          .setBytesToReserve(b.getLocalBlock().getNumBytes())
          .setFsVolume(this)
          .buildLocalReplicaInPipeline();
    return newReplicaInfo;
  }

  public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
      String bpid, long newBlockId, long recoveryId, long newlength)
      throws IOException {

    rur.breakHardLinksIfNeeded();
    File[] copiedReplicaFiles =
        copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
    File blockFile = copiedReplicaFiles[1];
    File metaFile = copiedReplicaFiles[0];
    LocalReplica.truncateBlock(rur.getVolume(), blockFile, metaFile,
        rur.getNumBytes(), newlength, fileIoProvider);

    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
        .setBlockId(newBlockId)
        .setGenerationStamp(recoveryId)
        .setFsVolume(this)
        .setDirectoryToUse(blockFile.getParentFile())
        .setBytesToReserve(newlength)
        .buildLocalReplicaInPipeline();
    // In theory, this rbw replica needs to reload last chunk checksum,
    // but it is immediately converted to finalized state within the same lock,
    // so no need to update it.
    return newReplicaInfo;
  }

  private File[] copyReplicaWithNewBlockIdAndGS(
      ReplicaInfo replicaInfo, String bpid, long newBlkId, long newGS)
      throws IOException {
    String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
    FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
    final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
    final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
    final File dstBlockFile = new File(destDir, blockFileName);
    final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
    return FsDatasetImpl.copyBlockFiles(replicaInfo, dstMetaFile,
        dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf);
  }

  @Override
  public void compileReport(String bpid, Collection<ScanInfo> report,
      ReportCompiler reportCompiler) throws InterruptedException, IOException {
    compileReport(getFinalizedDir(bpid), getFinalizedDir(bpid), report,
        reportCompiler);
  }

  @Override
  public FileIoProvider getFileIoProvider() {
    return fileIoProvider;
  }

  @Override
  public DataNodeVolumeMetrics getMetrics() {
    return metrics;
  }

  /**
   * Filter for block file names stored on the file system volumes.
   */
  public enum BlockDirFilter implements FilenameFilter {
    INSTANCE;

    @Override
    public boolean accept(File dir, String name) {
      return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
          || name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
          || name.startsWith(Block.BLOCK_FILE_PREFIX);
    }
  }

  private void compileReport(File bpFinalizedDir, File dir,
      Collection<ScanInfo> report, ReportCompiler reportCompiler)
      throws InterruptedException {

    reportCompiler.throttle();

    List <String> fileNames;
    try {
      fileNames =
          fileIoProvider.listDirectory(this, dir, BlockDirFilter.INSTANCE);
    } catch (IOException ioe) {
      LOG.warn("Exception occurred while compiling report", ioe);
      // Volume error check moved to FileIoProvider.
      // Ignore this directory and proceed.
      return;
    }
    Collections.sort(fileNames);

    /*
     * Assumption: In the sorted list of files block file appears immediately
     * before block metadata file. This is true for the current naming
     * convention for block file blk_<blockid> and meta file
     * blk_<blockid>_<genstamp>.meta
     */
    for (int i = 0; i < fileNames.size(); i++) {
      // Make sure this thread can make a timely exit. With a low throttle
      // rate, completing a run can take a looooong time.
      if (Thread.interrupted()) {
        throw new InterruptedException();
      }

      File file = new File(dir, fileNames.get(i));
      if (file.isDirectory()) {
        compileReport(bpFinalizedDir, file, report, reportCompiler);
        continue;
      }
      if (!Block.isBlockFilename(file)) {
        if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
          long blockId = Block.getBlockId(file.getName());
          verifyFileLocation(file, bpFinalizedDir,
              blockId);
          report.add(new ScanInfo(blockId, dir, null, fileNames.get(i), this));
        }
        continue;
      }
      File blockFile = file;
      long blockId = Block.filename2id(file.getName());
      File metaFile = null;

      // Skip all the files that start with block name until
      // getting to the metafile for the block
      while (i + 1 < fileNames.size()) {
        File blkMetaFile = new File(dir, fileNames.get(i + 1));
        if (!(blkMetaFile.isFile()
            && blkMetaFile.getName().startsWith(blockFile.getName()))) {
          break;
        }
        i++;
        if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
          metaFile = blkMetaFile;
          break;
        }
      }
      verifyFileLocation(blockFile, bpFinalizedDir, blockId);
      report.add(new ScanInfo(blockId, dir, blockFile.getName(),
          metaFile == null ? null : metaFile.getName(), this));
    }
  }

  /**
   * Helper method to determine if a file name is consistent with a block.
   * meta-data file
   *
   * @param blockId the block ID
   * @param metaFile the file to check
   * @return whether the file name is a block meta-data file name
   */
  private static boolean isBlockMetaFile(String blockId, String metaFile) {
    return metaFile.startsWith(blockId)
        && metaFile.endsWith(Block.METADATA_EXTENSION);
  }

  /**
   * Verify whether the actual directory location of block file has the
   * expected directory path computed using its block ID.
   */
  private void verifyFileLocation(File actualBlockFile,
      File bpFinalizedDir, long blockId) {
    File expectedBlockDir =
        DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
    File actualBlockDir = actualBlockFile.getParentFile();
    if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
      LOG.warn("Block: " + blockId +
          " found in invalid directory.  Expected directory: " +
          expectedBlockDir + ".  Actual directory: " + actualBlockDir);
    }
  }

  public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
      ReplicaInfo replicaInfo,
      int smallBufferSize,
      Configuration conf) throws IOException {

    File[] blockFiles = FsDatasetImpl.copyBlockFiles(block.getBlockId(),
        block.getGenerationStamp(), replicaInfo,
        getTmpDir(block.getBlockPoolId()),
        replicaInfo.isOnTransientStorage(), smallBufferSize, conf);

    ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
        .setBlockId(replicaInfo.getBlockId())
        .setGenerationStamp(replicaInfo.getGenerationStamp())
        .setFsVolume(this)
        .setDirectoryToUse(blockFiles[0].getParentFile())
        .setBytesToReserve(0)
        .build();
    newReplicaInfo.setNumBytes(blockFiles[1].length());
    return newReplicaInfo;
  }

  public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
      ReplicaInfo replicaInfo) throws IOException {

    File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
        block.getGenerationStamp(), replicaInfo,
        getTmpDir(block.getBlockPoolId()));

    ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
        .setBlockId(replicaInfo.getBlockId())
        .setGenerationStamp(replicaInfo.getGenerationStamp())
        .setFsVolume(this)
        .setDirectoryToUse(blockFiles[0].getParentFile())
        .setBytesToReserve(0)
        .build();
    newReplicaInfo.setNumBytes(blockFiles[1].length());
    return newReplicaInfo;
  }

  public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
      long genStamp,
      ReplicaInfo replicaInfo,
      int smallBufferSize,
      Configuration conf) throws IOException {

    File lazyPersistDir  = getLazyPersistDir(bpId);
    if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
      FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
      throw new IOException("LazyWriter fail to find or " +
          "create lazy persist dir: " + lazyPersistDir.toString());
    }

    // No FsDatasetImpl lock for the file copy
    File[] targetFiles = FsDatasetImpl.copyBlockFiles(
        blockId, genStamp, replicaInfo, lazyPersistDir, true,
        smallBufferSize, conf);
    return targetFiles;
  }

  public void incrNumBlocks(String bpid) throws IOException {
    getBlockPoolSlice(bpid).incrNumBlocks();
  }

  public void resolveDuplicateReplicas(String bpid, ReplicaInfo memBlockInfo,
      ReplicaInfo diskBlockInfo, ReplicaMap volumeMap) throws IOException {
    getBlockPoolSlice(bpid).resolveDuplicateReplicas(
        memBlockInfo, diskBlockInfo, volumeMap);
  }

  public ReplicaInfo activateSavedReplica(String bpid,
      ReplicaInfo replicaInfo, RamDiskReplica replicaState) throws IOException {
    return getBlockPoolSlice(bpid).activateSavedReplica(replicaInfo,
        replicaState);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AddBlockPoolException 源码

hadoop BlockPoolSlice 源码

hadoop CacheStats 源码

hadoop FsDatasetAsyncDiskService 源码

hadoop FsDatasetCache 源码

hadoop FsDatasetFactory 源码

hadoop FsDatasetImpl 源码

hadoop FsDatasetUtil 源码

hadoop FsVolumeImplBuilder 源码

hadoop FsVolumeList 源码

0  赞