hadoop ViewDistributedFileSystem 源码

  • 2022-10-20
  • 浏览 (205)

haddop ViewDistributedFileSystem 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.PartialListing;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

/**
 * The ViewDistributedFileSystem is an extended class to DistributedFileSystem
 * with additional mounting functionality. The goal is to have better API
 * compatibility for HDFS users when using mounting
 * filesystem(ViewFileSystemOverloadScheme).
 * The ViewFileSystemOverloadScheme{@link ViewFileSystemOverloadScheme} is a new
 * filesystem with inherited mounting functionality from ViewFileSystem.
 * For the user who is using ViewFileSystemOverloadScheme by setting
 * fs.hdfs.impl=org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme, now
 * they can set fs.hdfs.impl=org.apache.hadoop.hdfs.ViewDistributedFileSystem.
 * So, that the hdfs users will get closely compatible API with mount
 * functionality. For the rest of all other schemes can continue to use
 * ViewFileSystemOverloadScheme class directly for mount functionality. Please
 * note that ViewFileSystemOverloadScheme provides only
 * ViewFileSystem{@link ViewFileSystem} APIs.
 * If user configured this class but no mount point configured? Then it will
 * simply work as existing DistributedFileSystem class. If user configured both
 * fs.hdfs.impl to this class and mount configurations, then users will be able
 * to make calls the APIs available in this class, they are nothing but DFS
 * APIs, but they will be delegated to viewfs functionality. Please note, APIs
 * without any path in arguments( ex: isInSafeMode), will be delegated to
 * default filesystem only, that is the configured fallback link. If you want to
 * make these API calls on specific child filesystem, you may want to initialize
 * them separately and call. In ViewDistributedFileSystem, we strongly recommend
 * to configure linkFallBack when you add mount links and it's recommended to
 * point be to your base cluster, usually your current fs.defaultFS if that's
 * pointing to hdfs.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ViewDistributedFileSystem extends DistributedFileSystem {
  private static final Logger LOGGER =
      LoggerFactory.getLogger(ViewDistributedFileSystem.class);

  // A mounting file system.
  private ViewFileSystemOverloadScheme vfs;
  // A default DFS, which should have set via linkFallback
  private DistributedFileSystem defaultDFS;

  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    try {
      this.vfs = tryInitializeMountingViewFs(uri, conf);
    } catch (IOException ioe) {
      LOGGER.debug(new StringBuilder("Mount tree initialization failed with ")
          .append("the reason => {}. Falling back to regular DFS")
          .append(" initialization. Please re-initialize the fs after updating")
          .append(" mount point.").toString(), ioe.getMessage());
      // Previous super.initialize would have skipped the dfsclient init and
      // setWorkingDirectory as we planned to initialize vfs. Since vfs init
      // failed, let's init dfsClient now.
      super.initDFSClient(uri, conf);
      super.setWorkingDirectory(super.getHomeDirectory());
      return;
    }

    setConf(conf);
    // A child DFS with the current initialized URI. This must be same as
    // fallback fs. The fallback must point to root of your filesystems.
    // Some APIs(without path in argument, for example isInSafeMode) will
    // support only for base cluster filesystem. Only that APIs will use this
    // fs.
    defaultDFS = (DistributedFileSystem) this.vfs.getFallbackFileSystem();
    // Please don't access internal dfs client directly except in tests.
    dfs = (defaultDFS != null) ? defaultDFS.dfs : null;
    super.setWorkingDirectory(this.vfs.getHomeDirectory());
  }

  @Override
  void initDFSClient(URI uri, Configuration conf) throws IOException {
    // Since we plan to initialize vfs in this class, we will not need to
    // initialize DFS client.
  }

  public ViewDistributedFileSystem() {
  }

  private ViewFileSystemOverloadScheme tryInitializeMountingViewFs(URI theUri,
      Configuration conf) throws IOException {
    ViewFileSystemOverloadScheme viewFs = new ViewFileSystemOverloadScheme();
    viewFs.setSupportAutoAddingFallbackOnNoMounts(false);
    viewFs.initialize(theUri, conf);
    return viewFs;
  }

  @Override
  public URI getUri() {
    if (this.vfs == null) {
      return super.getUri();
    }
    return this.vfs.getUri();
  }

  @Override
  public String getScheme() {
    if (this.vfs == null) {
      return super.getScheme();
    }
    return this.vfs.getScheme();
  }

  @Override
  public Path getWorkingDirectory() {
    if (this.vfs == null) {
      return super.getWorkingDirectory();
    }
    return this.vfs.getWorkingDirectory();
  }

  @Override
  public void setWorkingDirectory(Path dir) {
    if (this.vfs == null) {
      super.setWorkingDirectory(dir);
      return;
    }
    this.vfs.setWorkingDirectory(dir);
  }

  @Override
  public Path getHomeDirectory() {
    if (super.dfs == null) {
      return null;
    }
    if (this.vfs == null) {
      return super.getHomeDirectory();
    }
    return this.vfs.getHomeDirectory();
  }

  /**
   * Returns only default cluster getHedgedReadMetrics.
   */
  @Override
  public DFSHedgedReadMetrics getHedgedReadMetrics() {
    if (this.vfs == null) {
      return super.getHedgedReadMetrics();
    }
    checkDefaultDFS(defaultDFS, "getHedgedReadMetrics");
    return defaultDFS.getHedgedReadMetrics();
  }

  @Override
  public BlockLocation[] getFileBlockLocations(FileStatus fs, long start,
      long len) throws IOException {
    if (this.vfs == null) {
      return super.getFileBlockLocations(fs, start, len);
    }
    return this.vfs.getFileBlockLocations(fs, start, len);
  }

  @Override
  public BlockLocation[] getFileBlockLocations(Path p, final long start,
      final long len) throws IOException {
    if (this.vfs == null) {
      return super.getFileBlockLocations(p, start, len);
    }
    return this.vfs.getFileBlockLocations(p, start, len);
  }

  @Override
  public void setVerifyChecksum(final boolean verifyChecksum) {
    if (this.vfs == null) {
      super.setVerifyChecksum(verifyChecksum);
      return;
    }
    this.vfs.setVerifyChecksum(verifyChecksum);
  }

  @Override
  public boolean recoverLease(final Path f) throws IOException {
    if (this.vfs == null) {
      return super.recoverLease(f);
    }

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "recoverLease");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .recoverLease(mountPathInfo.getPathOnTarget());
  }

  @Override
  public FSDataInputStream open(final Path f, final int bufferSize)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.open(f, bufferSize);
    }

    return this.vfs.open(f, bufferSize);
  }

  @Override
  public FSDataInputStream open(PathHandle fd, int bufferSize)
      throws IOException {
    if (this.vfs == null) {
      return super.open(fd, bufferSize);
    }
    return this.vfs.open(fd, bufferSize);
  }

  @Override
  protected HdfsPathHandle createPathHandle(FileStatus st,
      Options.HandleOpt... opts) {
    if (this.vfs == null) {
      return super.createPathHandle(st, opts);
    }
    throw new UnsupportedOperationException();
  }

  @Override
  public FSDataOutputStream append(final Path f, final int bufferSize,
      final Progressable progress) throws IOException {
    if (this.vfs == null) {
      return super.append(f, bufferSize, progress);
    }
    return this.vfs.append(f, bufferSize, progress);
  }

  @Override
  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
      final int bufferSize, final Progressable progress) throws IOException {
    if (this.vfs == null) {
      return super.append(f, flag, bufferSize, progress);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "append");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress);
  }

  @Override
  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
      final int bufferSize, final Progressable progress,
      final InetSocketAddress[] favoredNodes) throws IOException {
    if (this.vfs == null) {
      return super.append(f, flag, bufferSize, progress, favoredNodes);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "append");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress,
            favoredNodes);
  }

  @Override
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    if (this.vfs == null) {
      return super
          .create(f, permission, overwrite, bufferSize, replication, blockSize,
              progress);
    }
    return this.vfs
        .create(f, permission, overwrite, bufferSize, replication, blockSize,
            progress);
  }

  @Override
  public HdfsDataOutputStream create(final Path f,
      final FsPermission permission, final boolean overwrite,
      final int bufferSize, final short replication, final long blockSize,
      final Progressable progress, final InetSocketAddress[] favoredNodes)
      throws IOException {
    if (this.vfs == null) {
      return super
          .create(f, permission, overwrite, bufferSize, replication, blockSize,
              progress, favoredNodes);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "create");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .create(mountPathInfo.getPathOnTarget(), permission, overwrite,
            bufferSize, replication, blockSize, progress, favoredNodes);
  }

  @Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
      final EnumSet<CreateFlag> cflags, final int bufferSize,
      final short replication, final long blockSize,
      final Progressable progress, final Options.ChecksumOpt checksumOpt)
      throws IOException {
    if (this.vfs == null) {
      return super
          .create(f, permission, cflags, bufferSize, replication, blockSize,
              progress, checksumOpt);
    }
    return vfs.create(f, permission, cflags, bufferSize, replication, blockSize,
        progress, checksumOpt);
  }

  void checkDFS(FileSystem fs, String methodName) {
    if (!(fs instanceof DistributedFileSystem)) {
      String msg = new StringBuilder("This API:").append(methodName)
          .append(" is specific to DFS. Can't run on other fs:")
          .append(fs.getUri()).toString();
      throw new UnsupportedOperationException(msg);
    }
  }

  void checkDefaultDFS(FileSystem fs, String methodName) {
    if (fs == null) {
      String msg = new StringBuilder("This API:").append(methodName).append(
          " cannot be supported without default cluster(that is linkFallBack).")
          .toString();
      throw new UnsupportedOperationException(msg);
    }
  }

  @Override
  // DFS specific API
  protected HdfsDataOutputStream primitiveCreate(Path f,
      FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
      short replication, long blockSize, Progressable progress,
      Options.ChecksumOpt checksumOpt) throws IOException {
    if (this.vfs == null) {
      return super
          .primitiveCreate(f, absolutePermission, flag, bufferSize, replication,
              blockSize, progress, checksumOpt);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "primitiveCreate");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .primitiveCreate(f, absolutePermission, flag, bufferSize, replication,
            blockSize, progress, checksumOpt);
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      EnumSet<CreateFlag> flags, int bufferSize, short replication,
      long blockSize, Progressable progress) throws IOException {
    if (this.vfs == null) {
      return super
          .createNonRecursive(f, permission, flags, bufferSize, replication,
              bufferSize, progress);
    }
    return this.vfs
        .createNonRecursive(f, permission, flags, bufferSize, replication,
            bufferSize, progress);
  }

  @Override
  public boolean setReplication(final Path f, final short replication)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.setReplication(f, replication);
    }
    return this.vfs.setReplication(f, replication);
  }

  @Override
  public void setStoragePolicy(Path src, String policyName) throws IOException {
    if (this.vfs == null) {
      super.setStoragePolicy(src, policyName);
      return;
    }
    this.vfs.setStoragePolicy(src, policyName);
  }

  @Override
  public void unsetStoragePolicy(Path src) throws IOException {
    if (this.vfs == null) {
      super.unsetStoragePolicy(src);
      return;
    }
    this.vfs.unsetStoragePolicy(src);
  }

  @Override
  public BlockStoragePolicySpi getStoragePolicy(Path src) throws IOException {
    if (this.vfs == null) {
      return super.getStoragePolicy(src);
    }
    return this.vfs.getStoragePolicy(src);
  }

  @Override
  public Collection<BlockStoragePolicy> getAllStoragePolicies()
      throws IOException {
    if (this.vfs == null) {
      return super.getAllStoragePolicies();
    }
    Collection<? extends BlockStoragePolicySpi> allStoragePolicies =
        this.vfs.getAllStoragePolicies();
    return (Collection<BlockStoragePolicy>) allStoragePolicies;
  }

  @Override
  public long getBytesWithFutureGenerationStamps() throws IOException {
    if (this.vfs == null) {
      return super.getBytesWithFutureGenerationStamps();
    }
    checkDefaultDFS(defaultDFS, "getBytesWithFutureGenerationStamps");
    return defaultDFS.getBytesWithFutureGenerationStamps();
  }

  @Deprecated
  @Override
  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
    if (this.vfs == null) {
      return super.getStoragePolicies();
    }
    checkDefaultDFS(defaultDFS, "getStoragePolicies");
    return defaultDFS.getStoragePolicies();
  }

  @Override
  //Make sure your target fs supports this API, otherwise you will get
  // Unsupported operation exception.
  public void concat(Path trg, Path[] psrcs) throws IOException {
    if (this.vfs == null) {
      super.concat(trg, psrcs);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(trg, getConf());
    mountPathInfo.getTargetFs().concat(mountPathInfo.getPathOnTarget(), psrcs);
  }

  @SuppressWarnings("deprecation")
  @Override
  public boolean rename(final Path src, final Path dst) throws IOException {
    if (this.vfs == null) {
      return super.rename(src, dst);
    }
    return this.vfs.rename(src, dst);
  }

  @SuppressWarnings("deprecation")
  @Override
  public void rename(Path src, Path dst, final Options.Rename... options)
      throws IOException {
    if (this.vfs == null) {
      super.rename(src, dst, options);
      return;
    }

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountSrcPathInfo =
        this.vfs.getMountPathInfo(src, getConf());

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountDstPathInfo =
        this.vfs.getMountPathInfo(dst, getConf());

    //Check both in same cluster.
    if (!mountSrcPathInfo.getTargetFs().getUri()
        .equals(mountDstPathInfo.getTargetFs().getUri())) {
      throw new HadoopIllegalArgumentException(
          "Can't rename across file systems.");
    }

    FileUtil.rename(mountSrcPathInfo.getTargetFs(),
        mountSrcPathInfo.getPathOnTarget(), mountDstPathInfo.getPathOnTarget(),
        options);
  }

  @Override
  public boolean truncate(final Path f, final long newLength)
      throws IOException {
    if (this.vfs == null) {
      return super.truncate(f, newLength);
    }
    return this.vfs.truncate(f, newLength);
  }

  public boolean delete(final Path f, final boolean recursive)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.delete(f, recursive);
    }
    return this.vfs.delete(f, recursive);
  }

  @Override
  public ContentSummary getContentSummary(Path f) throws IOException {
    if (this.vfs == null) {
      return super.getContentSummary(f);
    }
    return this.vfs.getContentSummary(f);
  }

  @Override
  public QuotaUsage getQuotaUsage(Path f) throws IOException {
    if (this.vfs == null) {
      return super.getQuotaUsage(f);
    }
    return this.vfs.getQuotaUsage(f);
  }

  @Override
  public void setQuota(Path src, final long namespaceQuota,
      final long storagespaceQuota) throws IOException {
    if (this.vfs == null) {
      super.setQuota(src, namespaceQuota, storagespaceQuota);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(src, getConf());
    mountPathInfo.getTargetFs()
        .setQuota(mountPathInfo.getPathOnTarget(), namespaceQuota,
            storagespaceQuota);
  }

  @Override
  public void setQuotaByStorageType(Path src, final StorageType type,
      final long quota) throws IOException {
    if (this.vfs == null) {
      super.setQuotaByStorageType(src, type, quota);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(src, getConf());
    mountPathInfo.getTargetFs()
        .setQuotaByStorageType(mountPathInfo.getPathOnTarget(), type, quota);
  }

  @Override
  public FileStatus[] listStatus(Path p) throws IOException {
    if (this.vfs == null) {
      return super.listStatus(p);
    }
    return this.vfs.listStatus(p);
  }

  @Override
  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
      final PathFilter filter) throws FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.listLocatedStatus(f, filter);
    }
    return this.vfs.listLocatedStatus(f, filter);
  }

  @Override
  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
      throws IOException {
    if (this.vfs == null) {
      return super.listStatusIterator(p);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(p, getConf());
    return mountPathInfo.getTargetFs()
        .listStatusIterator(mountPathInfo.getPathOnTarget());
  }

  @Override
  public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
      final List<Path> paths) throws IOException {
    if (this.vfs == null) {
      return super.batchedListStatusIterator(paths);
    }
    // TODO: revisit for correct implementation.
    return this.defaultDFS.batchedListStatusIterator(paths);
  }

  @Override
  public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
      final List<Path> paths) throws IOException {
    if (this.vfs == null) {
      return super.batchedListLocatedStatusIterator(paths);
    }
    // TODO: revisit for correct implementation.
    return this.defaultDFS.batchedListLocatedStatusIterator(paths);
  }

  public boolean mkdir(Path f, FsPermission permission) throws IOException {
    if (this.vfs == null) {
      return super.mkdir(f, permission);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "mkdir");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .mkdir(mountPathInfo.getPathOnTarget(), permission);
  }

  @Override
  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    if (this.vfs == null) {
      return super.mkdirs(f, permission);
    }
    return this.vfs.mkdirs(f, permission);
  }

  @SuppressWarnings("deprecation")
  @Override
  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
      throws IOException {
    if (this.vfs == null) {
      return super.primitiveMkdir(f, absolutePermission);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "primitiveMkdir");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .primitiveMkdir(mountPathInfo.getPathOnTarget(), absolutePermission);
  }

  @Override
  public void close() throws IOException {
    if (this.vfs != null) {
      this.vfs.close();
    }
    super.close();
  }

  @InterfaceAudience.Private
  @Override
  public DFSClient getClient() {
    if (this.vfs == null) {
      return super.getClient();
    }
    checkDefaultDFS(defaultDFS, "getClient");
    return defaultDFS.getClient();
  }

  @Override
  public FsStatus getStatus(Path p) throws IOException {
    if (this.vfs == null) {
      return super.getStatus(p);
    }
    return this.vfs.getStatus(p);
  }

  @Override
  public long getMissingBlocksCount() throws IOException {
    if (this.vfs == null) {
      return super.getMissingBlocksCount();
    }
    checkDefaultDFS(defaultDFS, "getMissingBlocksCount");
    return defaultDFS.getMissingBlocksCount();
  }

  @Override
  public long getPendingDeletionBlocksCount() throws IOException {
    if (this.vfs == null) {
      return super.getPendingDeletionBlocksCount();
    }
    checkDefaultDFS(defaultDFS, "getPendingDeletionBlocksCount");
    return defaultDFS.getPendingDeletionBlocksCount();
  }

  @Override
  public long getMissingReplOneBlocksCount() throws IOException {
    if (this.vfs == null) {
      return super.getMissingReplOneBlocksCount();
    }
    checkDefaultDFS(defaultDFS, "getMissingReplOneBlocksCount");
    return defaultDFS.getMissingReplOneBlocksCount();
  }

  @Override
  public long getLowRedundancyBlocksCount() throws IOException {
    if (this.vfs == null) {
      return super.getLowRedundancyBlocksCount();
    }
    checkDefaultDFS(defaultDFS, "getLowRedundancyBlocksCount");
    return defaultDFS.getLowRedundancyBlocksCount();
  }

  @Override
  public long getCorruptBlocksCount() throws IOException {
    if (this.vfs == null) {
      return super.getCorruptBlocksCount();
    }
    checkDefaultDFS(defaultDFS, "getCorruptBlocksCount");
    return defaultDFS.getLowRedundancyBlocksCount();
  }

  @Override
  public RemoteIterator<Path> listCorruptFileBlocks(final Path path)
      throws IOException {
    if (this.vfs == null) {
      return super.listCorruptFileBlocks(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    return mountPathInfo.getTargetFs()
        .listCorruptFileBlocks(mountPathInfo.getPathOnTarget());
  }

  @Override
  public DatanodeInfo[] getDataNodeStats() throws IOException {
    if (this.vfs == null) {
      return super.getDataNodeStats();
    }
    checkDefaultDFS(defaultDFS, "getDataNodeStats");
    return defaultDFS.getDataNodeStats();
  }

  @Override
  public DatanodeInfo[] getDataNodeStats(
      final HdfsConstants.DatanodeReportType type) throws IOException {
    if (this.vfs == null) {
      return super.getDataNodeStats(type);
    }
    checkDefaultDFS(defaultDFS, "getDataNodeStats");
    return defaultDFS.getDataNodeStats(type);
  }

  @Override
  public boolean setSafeMode(HdfsConstants.SafeModeAction action)
      throws IOException {
    if (this.vfs == null) {
      return super.setSafeMode(action);
    }
    checkDefaultDFS(defaultDFS, "setSafeMode");
    return defaultDFS.setSafeMode(action);
  }

  @Override
  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
      boolean isChecked) throws IOException {
    if (this.vfs == null) {
      return super.setSafeMode(action, isChecked);
    }
    checkDefaultDFS(defaultDFS, "setSafeMode");
    return defaultDFS.setSafeMode(action, isChecked);
  }

  @Override
  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
    if (this.vfs == null) {
      return super.saveNamespace(timeWindow, txGap);
    }
    checkDefaultDFS(defaultDFS, "saveNamespace");
    return defaultDFS.saveNamespace(timeWindow, txGap);
  }

  @Override
  public void saveNamespace() throws IOException {
    if (this.vfs == null) {
      super.saveNamespace();
      return;
    }
    checkDefaultDFS(defaultDFS, "saveNamespace");
    defaultDFS.saveNamespace();
  }

  @Override
  public long rollEdits() throws IOException {
    if (this.vfs == null) {
      return super.rollEdits();
    }
    checkDefaultDFS(defaultDFS, "rollEdits");
    return defaultDFS.rollEdits();
  }

  @Override
  public boolean restoreFailedStorage(String arg) throws IOException {
    if (this.vfs == null) {
      return super.restoreFailedStorage(arg);
    }
    checkDefaultDFS(defaultDFS, "restoreFailedStorage");
    return defaultDFS.restoreFailedStorage(arg);
  }

  @Override
  public void refreshNodes() throws IOException {
    if (this.vfs == null) {
      super.refreshNodes();
      return;
    }
    checkDefaultDFS(defaultDFS, "refreshNodes");
    defaultDFS.refreshNodes();
  }

  @Override
  public void finalizeUpgrade() throws IOException {
    if (this.vfs == null) {
      super.finalizeUpgrade();
      return;
    }
    checkDefaultDFS(defaultDFS, "finalizeUpgrade");
    defaultDFS.finalizeUpgrade();
  }

  @Override
  public boolean upgradeStatus() throws IOException {
    if (this.vfs == null) {
      return super.upgradeStatus();
    }
    checkDefaultDFS(defaultDFS, "upgradeStatus");
    return defaultDFS.upgradeStatus();
  }

  @Override
  public RollingUpgradeInfo rollingUpgrade(
      HdfsConstants.RollingUpgradeAction action) throws IOException {
    if (this.vfs == null) {
      return super.rollingUpgrade(action);
    }
    checkDefaultDFS(defaultDFS, "rollingUpgrade");
    return defaultDFS.rollingUpgrade(action);
  }

  @Override
  public void metaSave(String pathname) throws IOException {
    if (this.vfs == null) {
      super.metaSave(pathname);
      return;
    }
    checkDefaultDFS(defaultDFS, "metaSave");
    defaultDFS.metaSave(pathname);
  }

  @Override
  public FsServerDefaults getServerDefaults() throws IOException {
    if (this.vfs == null) {
      return super.getServerDefaults();
    }
    checkDefaultDFS(defaultDFS, "getServerDefaults");
    //TODO: Need to revisit.
    return defaultDFS.getServerDefaults();
  }

  @Override
  public FileStatus getFileStatus(final Path f)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.getFileStatus(f);
    }
    return this.vfs.getFileStatus(f);
  }

  @SuppressWarnings("deprecation")
  @Override
  public void createSymlink(final Path target, final Path link,
      final boolean createParent) throws IOException {
     // Regular DFS behavior
    if (this.vfs == null) {
      super.createSymlink(target, link, createParent);
      return;
    }

    throw new UnsupportedOperationException(
        "createSymlink is not supported in ViewHDFS");
  }

  @Override
  public boolean supportsSymlinks() {
    if (this.vfs == null) {
      return super.supportsSymlinks();
    }
    // we can enabled later if we want to support symlinks.
    return false;
  }

  @Override
  public FileStatus getFileLinkStatus(final Path f) throws IOException {
    if (this.vfs == null) {
      return super.getFileLinkStatus(f);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    return mountPathInfo.getTargetFs()
        .getFileLinkStatus(mountPathInfo.getPathOnTarget());
  }

  @Override
  public Path getLinkTarget(Path path) throws IOException {
    if(this.vfs==null){
      return super.getLinkTarget(path);
    }
    return this.vfs.getLinkTarget(path);
  }

  @Override
  protected Path resolveLink(Path f) throws IOException {
    if(this.vfs==null){
      return super.resolveLink(f);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(f, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "resolveLink");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .resolveLink(mountPathInfo.getPathOnTarget());
  }

  @Override
  public FileChecksum getFileChecksum(final Path f)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.getFileChecksum(f);
    }
    return this.vfs.getFileChecksum(f);
  }

  @Override
  public void setPermission(final Path f, final FsPermission permission)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      super.setPermission(f, permission);
      return;
    }
    this.vfs.setPermission(f, permission);
  }

  @Override
  public void setOwner(final Path f, final String username,
      final String groupname)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      super.setOwner(f, username, groupname);
      return;
    }
    this.vfs.setOwner(f, username, groupname);
  }

  @Override
  public void setTimes(final Path f, final long mtime, final long atime)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      super.setTimes(f, mtime, atime);
      return;
    }
    this.vfs.setTimes(f, mtime, atime);
  }

  @Override
  // DFS specific API
  protected int getDefaultPort() {
    return super.getDefaultPort();
  }

  /**
   * If no mount points configured, it works same as
   * {@link DistributedFileSystem#getDelegationToken(String)}. If
   * there are mount points configured and if default fs(linkFallback)
   * configured, then it will return default fs delegation token. Otherwise
   * it will return null.
   */
  @Override
  public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
      throws IOException {
    if (this.vfs == null) {
      return super.getDelegationToken(renewer);
    }

    if (defaultDFS != null) {
      return defaultDFS.getDelegationToken(renewer);
    }
    return null;
  }

  @Override
  public void setBalancerBandwidth(long bandwidth) throws IOException {
    if (this.vfs == null) {
      super.setBalancerBandwidth(bandwidth);
      return;
    }
    checkDefaultDFS(defaultDFS, "setBalancerBandwidth");
    defaultDFS.setBalancerBandwidth(bandwidth);
  }

  @Override
  public String getCanonicalServiceName() {
    if (this.vfs == null) {
      return super.getCanonicalServiceName();
    }
    checkDefaultDFS(defaultDFS, "getCanonicalServiceName");
    return defaultDFS.getCanonicalServiceName();
  }

  @Override
  protected URI canonicalizeUri(URI uri) {
    if (this.vfs == null) {
      return super.canonicalizeUri(uri);
    }

    return vfs.canonicalizeUri(uri);
  }

  @Override
  public boolean isInSafeMode() throws IOException {
    if (this.vfs == null) {
      return super.isInSafeMode();
    }
    checkDefaultDFS(defaultDFS, "isInSafeMode");
    return defaultDFS.isInSafeMode();
  }

  @Override
  // DFS specific API
  public void allowSnapshot(Path path) throws IOException {
    if (this.vfs == null) {
      super.allowSnapshot(path);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "allowSnapshot");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .allowSnapshot(mountPathInfo.getPathOnTarget());
  }

  @Override
  public void disallowSnapshot(final Path path) throws IOException {
    if (this.vfs == null) {
      super.disallowSnapshot(path);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "disallowSnapshot");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .disallowSnapshot(mountPathInfo.getPathOnTarget());
  }

  @Override
  public Path createSnapshot(Path path, String snapshotName)
      throws IOException {
    if (this.vfs == null) {
      return super.createSnapshot(path, snapshotName);
    }
    return this.vfs.createSnapshot(path, snapshotName);
  }

  @Override
  public void renameSnapshot(Path path, String snapshotOldName,
      String snapshotNewName) throws IOException {
    if (this.vfs == null) {
      super.renameSnapshot(path, snapshotOldName, snapshotNewName);
      return;
    }
    this.vfs.renameSnapshot(path, snapshotOldName, snapshotNewName);
  }

  @Override
  //Ony for HDFS users
  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
      throws IOException {
    if (this.vfs == null) {
      return super.getSnapshottableDirListing();
    }
    checkDefaultDFS(defaultDFS, "getSnapshottableDirListing");
    return defaultDFS.getSnapshottableDirListing();
  }

  @Override
  public void deleteSnapshot(Path path, String snapshotName)
      throws IOException {
    if (this.vfs == null) {
      super.deleteSnapshot(path, snapshotName);
      return;
    }
    this.vfs.deleteSnapshot(path, snapshotName);
  }

  @Override
  public RemoteIterator<SnapshotDiffReportListing> snapshotDiffReportListingRemoteIterator(
      final Path snapshotDir, final String fromSnapshot,
      final String toSnapshot) throws IOException {
    if (this.vfs == null) {
      return super
          .snapshotDiffReportListingRemoteIterator(snapshotDir, fromSnapshot,
              toSnapshot);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(snapshotDir, getConf());
    checkDFS(mountPathInfo.getTargetFs(),
        "snapshotDiffReportListingRemoteIterator");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .snapshotDiffReportListingRemoteIterator(
            mountPathInfo.getPathOnTarget(), fromSnapshot, toSnapshot);
  }

  @Override
  public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
      final String fromSnapshot, final String toSnapshot) throws IOException {
    if (this.vfs == null) {
      return super.getSnapshotDiffReport(snapshotDir, fromSnapshot, toSnapshot);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(snapshotDir, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "getSnapshotDiffReport");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .getSnapshotDiffReport(mountPathInfo.getPathOnTarget(), fromSnapshot,
            toSnapshot);
  }

  @Override
  public boolean isFileClosed(final Path src) throws IOException {
    if (this.vfs == null) {
      return super.isFileClosed(src);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(src, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "isFileClosed");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .isFileClosed(mountPathInfo.getPathOnTarget());
  }

  @Override
  public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
    if (this.vfs == null) {
      return super.addCacheDirective(info);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(info.getPath(), getConf());
    checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective");

    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .addCacheDirective(new CacheDirectiveInfo.Builder(info)
            .setPath(mountPathInfo.getPathOnTarget()).build());
  }

  @Override
  public long addCacheDirective(CacheDirectiveInfo info,
      EnumSet<CacheFlag> flags) throws IOException {
    if (this.vfs == null) {
      return super.addCacheDirective(info, flags);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(info.getPath(), getConf());
    checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective");

    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .addCacheDirective(new CacheDirectiveInfo.Builder(info)
            .setPath(mountPathInfo.getPathOnTarget()).build(), flags);
  }

  @Override
  public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
    if (this.vfs == null) {
      super.modifyCacheDirective(info);
      return;
    }
    if (info.getPath() != null) {
      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
          this.vfs.getMountPathInfo(info.getPath(), getConf());
      checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective");
      ((DistributedFileSystem) mountPathInfo.getTargetFs())
          .modifyCacheDirective(new CacheDirectiveInfo.Builder(info)
              .setPath(mountPathInfo.getPathOnTarget()).build());
      return;
    }

    // No path available in CacheDirectiveInfo, Let's shoot to all child fs.
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.modifyCacheDirective(info);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void modifyCacheDirective(CacheDirectiveInfo info,
      EnumSet<CacheFlag> flags) throws IOException {
    if (this.vfs == null) {
      super.modifyCacheDirective(info, flags);
      return;
    }
    if (info.getPath() != null) {
      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
          this.vfs.getMountPathInfo(info.getPath(), getConf());
      checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective");
      ((DistributedFileSystem) mountPathInfo.getTargetFs())
          .modifyCacheDirective(new CacheDirectiveInfo.Builder(info)
              .setPath(mountPathInfo.getPathOnTarget()).build(), flags);
      return;
    }
    // No path available in CacheDirectiveInfo, Let's shoot to all child fs.
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;
    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.modifyCacheDirective(info, flags);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void removeCacheDirective(long id) throws IOException {
    if (this.vfs == null) {
      super.removeCacheDirective(id);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.removeCacheDirective(id);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
      CacheDirectiveInfo filter) throws IOException {
    if (this.vfs == null) {
      return super.listCacheDirectives(filter);
    }

    if (filter != null && filter.getPath() != null) {
      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
          this.vfs.getMountPathInfo(filter.getPath(), getConf());
      checkDFS(mountPathInfo.getTargetFs(), "listCacheDirectives");
      return ((DistributedFileSystem) mountPathInfo.getTargetFs())
          .listCacheDirectives(new CacheDirectiveInfo.Builder(filter)
              .setPath(mountPathInfo.getPathOnTarget()).build());
    }

    // No path available in filter. Let's try to shoot to all child fs.
    final List<RemoteIterator<CacheDirectiveEntry>> iters = new ArrayList<>();
    for (FileSystem fs : getChildFileSystems()) {
      if (fs instanceof DistributedFileSystem) {
        iters.add(((DistributedFileSystem) fs).listCacheDirectives(filter));
      }
    }
    if (iters.size() == 0) {
      throw new UnsupportedOperationException(
          "No DFS found in child fs. This API can't be supported in non DFS");
    }

    return new RemoteIterator<CacheDirectiveEntry>() {
      int currIdx = 0;
      RemoteIterator<CacheDirectiveEntry> currIter = iters.get(currIdx++);

      @Override
      public boolean hasNext() throws IOException {
        if (currIter.hasNext()) {
          return true;
        }
        while (currIdx < iters.size()) {
          currIter = iters.get(currIdx++);
          if (currIter.hasNext()) {
            return true;
          }
        }
        return false;
      }

      @Override
      public CacheDirectiveEntry next() throws IOException {
        if (hasNext()) {
          return currIter.next();
        }
        throw new NoSuchElementException("No more elements");
      }
    };
  }

  //Currently Cache pool APIs supported only in default cluster.
  @Override
  public void addCachePool(CachePoolInfo info) throws IOException {
    if (this.vfs == null) {
      super.addCachePool(info);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.addCachePool(info);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void modifyCachePool(CachePoolInfo info) throws IOException {
    if (this.vfs == null) {
      super.modifyCachePool(info);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.modifyCachePool(info);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void removeCachePool(String poolName) throws IOException {
    if (this.vfs == null) {
      super.removeCachePool(poolName);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.removeCachePool(poolName);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
    if (this.vfs == null) {
      return super.listCachePools();
    }

    List<DistributedFileSystem> childDFSs = new ArrayList<>();
    for (FileSystem fs : getChildFileSystems()) {
      if (fs instanceof DistributedFileSystem) {
        childDFSs.add((DistributedFileSystem) fs);
      }
    }
    if (childDFSs.size() == 0) {
      throw new UnsupportedOperationException(
          "No DFS found in child fs. This API can't be supported in non DFS");
    }
    return new RemoteIterator<CachePoolEntry>() {
      int curDfsIdx = 0;
      RemoteIterator<CachePoolEntry> currIter =
          childDFSs.get(curDfsIdx++).listCachePools();

      @Override
      public boolean hasNext() throws IOException {
        if (currIter.hasNext()) {
          return true;
        }
        while (curDfsIdx < childDFSs.size()) {
          currIter = childDFSs.get(curDfsIdx++).listCachePools();
          if (currIter.hasNext()) {
            return true;
          }
        }
        return false;
      }

      @Override
      public CachePoolEntry next() throws IOException {
        if (hasNext()) {
          return currIter.next();
        }
        throw new java.util.NoSuchElementException("No more entries");
      }
    };
  }

  @Override
  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
      throws IOException {
    if (this.vfs == null) {
      super.modifyAclEntries(path, aclSpec);
      return;
    }
    this.vfs.modifyAclEntries(path, aclSpec);
  }

  @Override
  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
      throws IOException {
    if (this.vfs == null) {
      super.removeAclEntries(path, aclSpec);
      return;
    }
    this.vfs.removeAclEntries(path, aclSpec);
  }

  @Override
  public void removeDefaultAcl(Path path) throws IOException {
    if (this.vfs == null) {
      super.removeDefaultAcl(path);
      return;
    }
    this.vfs.removeDefaultAcl(path);
  }

  @Override
  public void removeAcl(Path path) throws IOException {
    if (this.vfs == null) {
      super.removeAcl(path);
      return;
    }
    this.vfs.removeAcl(path);
  }

  @Override
  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
    if (this.vfs == null) {
      super.setAcl(path, aclSpec);
      return;
    }
    this.vfs.setAcl(path, aclSpec);
  }

  @Override
  public AclStatus getAclStatus(Path path) throws IOException {
    if (this.vfs == null) {
      return super.getAclStatus(path);
    }
    return this.vfs.getAclStatus(path);
  }

  @Override
  public void createEncryptionZone(final Path path, final String keyName)
      throws IOException {
    if (this.vfs == null) {
      super.createEncryptionZone(path, keyName);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "createEncryptionZone");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .createEncryptionZone(mountPathInfo.getPathOnTarget(), keyName);
  }

  @Override
  public EncryptionZone getEZForPath(final Path path) throws IOException {
    if (this.vfs == null) {
      return super.getEZForPath(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "getEZForPath");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .getEZForPath(mountPathInfo.getPathOnTarget());
  }

  /**
   * Returns the results from default DFS (fallback). If you want the results
   * from specific clusters, please invoke them on child fs instance directly.
   */
  @Override
  public RemoteIterator<EncryptionZone> listEncryptionZones()
      throws IOException {
    if (this.vfs == null) {
      return super.listEncryptionZones();
    }
    checkDefaultDFS(defaultDFS, "listEncryptionZones");
    return defaultDFS.listEncryptionZones();
  }

  @Override
  public void reencryptEncryptionZone(final Path zone,
      final HdfsConstants.ReencryptAction action) throws IOException {
    if (this.vfs == null) {
      super.reencryptEncryptionZone(zone, action);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(zone, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "reencryptEncryptionZone");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .reencryptEncryptionZone(mountPathInfo.getPathOnTarget(), action);
  }

  /**
   * Returns the results from default DFS (fallback). If you want the results
   * from specific clusters, please invoke them on child fs instance directly.
   */
  @Override
  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
      throws IOException {
    if (this.vfs == null) {
      return super.listReencryptionStatus();
    }
    checkDefaultDFS(defaultDFS, "listReencryptionStatus");
    return defaultDFS.listReencryptionStatus();
  }

  @Override
  public FileEncryptionInfo getFileEncryptionInfo(final Path path)
      throws IOException {
    if (this.vfs == null) {
      return super.getFileEncryptionInfo(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "getFileEncryptionInfo");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .getFileEncryptionInfo(mountPathInfo.getPathOnTarget());
  }

  @Override
  public void provisionEZTrash(final Path path,
      final FsPermission trashPermission) throws IOException {
    if (this.vfs == null) {
      super.provisionEZTrash(path, trashPermission);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "provisionEZTrash");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .provisionEZTrash(mountPathInfo.getPathOnTarget(), trashPermission);
  }

  @Override
  public Path provisionSnapshotTrash(final Path path,
      final FsPermission trashPermission) throws IOException {
    if (this.vfs == null) {
      return super.provisionSnapshotTrash(path, trashPermission);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "provisionSnapshotTrash");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .provisionSnapshotTrash(mountPathInfo.getPathOnTarget(),
          trashPermission);
  }

  @Override
  public void setXAttr(Path path, String name, byte[] value,
      EnumSet<XAttrSetFlag> flag) throws IOException {
    if (this.vfs == null) {
      super.setXAttr(path, name, value, flag);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    mountPathInfo.getTargetFs()
        .setXAttr(mountPathInfo.getPathOnTarget(), name, value, flag);
  }

  @Override
  public byte[] getXAttr(Path path, String name) throws IOException {
    if (this.vfs == null) {
      return super.getXAttr(path, name);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    return mountPathInfo.getTargetFs()
        .getXAttr(mountPathInfo.getPathOnTarget(), name);
  }

  @Override
  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
    if (this.vfs == null) {
      return super.getXAttrs(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    return mountPathInfo.getTargetFs()
        .getXAttrs(mountPathInfo.getPathOnTarget());
  }

  @Override
  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
      throws IOException {
    if (this.vfs == null) {
      return super.getXAttrs(path, names);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    return mountPathInfo.getTargetFs()
        .getXAttrs(mountPathInfo.getPathOnTarget(), names);
  }

  @Override
  public List<String> listXAttrs(Path path) throws IOException {
    if (this.vfs == null) {
      return super.listXAttrs(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    return mountPathInfo.getTargetFs()
        .listXAttrs(mountPathInfo.getPathOnTarget());
  }

  @Override
  public void removeXAttr(Path path, String name) throws IOException {
    if (this.vfs == null) {
      super.removeXAttr(path, name);
      return;
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    mountPathInfo.getTargetFs()
        .removeXAttr(mountPathInfo.getPathOnTarget(), name);
  }

  @Override
  public void access(Path path, FsAction mode)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      super.access(path, mode);
      return;
    }
    this.vfs.access(path, mode);
  }

  @Override
  public URI getKeyProviderUri() throws IOException {
    if (this.vfs == null) {
      return super.getKeyProviderUri();
    }
    checkDefaultDFS(defaultDFS, "getKeyProviderUri");
    return defaultDFS.getKeyProviderUri();
  }

  @Override
  public KeyProvider getKeyProvider() throws IOException {
    if (this.vfs == null) {
      return super.getKeyProvider();
    }
    checkDefaultDFS(defaultDFS, "getKeyProvider");
    return defaultDFS.getKeyProvider();
  }

  @Override
  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
      throws IOException {
    if (this.vfs == null) {
      return super.getChildFileSystems();
    }

    return this.vfs.getChildFileSystems();
  }

  @Override
  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
    if (this.vfs == null) {
      return super.getInotifyEventStream();
    }
    checkDefaultDFS(defaultDFS, "getInotifyEventStream");
    return defaultDFS.getInotifyEventStream();
  }

  @Override
  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
      throws IOException {
    if (this.vfs == null) {
      return super.getInotifyEventStream();
    }
    checkDefaultDFS(defaultDFS, "getInotifyEventStream");
    return defaultDFS.getInotifyEventStream();
  }

  @Override
  // DFS only API.
  public void setErasureCodingPolicy(final Path path, final String ecPolicyName)
      throws IOException {
    if (this.vfs == null) {
      super.setErasureCodingPolicy(path, ecPolicyName);
      return;
    }

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "setErasureCodingPolicy");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .setErasureCodingPolicy(mountPathInfo.getPathOnTarget(), ecPolicyName);
  }

  @Override
  public void satisfyStoragePolicy(Path src) throws IOException {
    if (this.vfs == null) {
      super.satisfyStoragePolicy(src);
      return;
    }
    this.vfs.satisfyStoragePolicy(src);
  }

  @Override
  public ErasureCodingPolicy getErasureCodingPolicy(final Path path)
      throws IOException {
    if (this.vfs == null) {
      return super.getErasureCodingPolicy(path);
    }

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "getErasureCodingPolicy");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .getErasureCodingPolicy(mountPathInfo.getPathOnTarget());
  }

  /**
   * Gets all erasure coding policies from all available child file systems.
   */
  @Override
  public Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies()
      throws IOException {
    if (this.vfs == null) {
      return super.getAllErasureCodingPolicies();
    }
    FileSystem[] childFss = getChildFileSystems();
    List<ErasureCodingPolicyInfo> results = new ArrayList<>();
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;
    for (FileSystem fs : childFss) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        results.addAll(dfs.getAllErasureCodingPolicies());
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }

    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
    return results;
  }

  @Override
  public Map<String, String> getAllErasureCodingCodecs() throws IOException {
    if (this.vfs == null) {
      return super.getAllErasureCodingCodecs();
    }
    FileSystem[] childFss = getChildFileSystems();
    Map<String, String> results = new HashMap<>();
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;
    for (FileSystem fs : childFss) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        results.putAll(dfs.getAllErasureCodingCodecs());
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
    return results;
  }

  @Override
  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
      ErasureCodingPolicy[] policies) throws IOException {
    if (this.vfs == null) {
      return super.addErasureCodingPolicies(policies);
    }
    List<IOException> failedExceptions = new ArrayList<>();
    List<AddErasureCodingPolicyResponse> results = new ArrayList<>();
    boolean isDFSExistsInChilds = false;
    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        results.addAll(Arrays.asList(dfs.addErasureCodingPolicies(policies)));
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
    return results.toArray(new AddErasureCodingPolicyResponse[results.size()]);
  }

  @Override
  public void removeErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    if (this.vfs == null) {
      super.removeErasureCodingPolicy(ecPolicyName);
      return;
    }

    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.removeErasureCodingPolicy(ecPolicyName);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void enableErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    if (this.vfs == null) {
      super.enableErasureCodingPolicy(ecPolicyName);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.enableErasureCodingPolicy(ecPolicyName);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void disableErasureCodingPolicy(String ecPolicyName)
      throws IOException {
    if (this.vfs == null) {
      super.disableErasureCodingPolicy(ecPolicyName);
      return;
    }
    List<IOException> failedExceptions = new ArrayList<>();
    boolean isDFSExistsInChilds = false;

    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      isDFSExistsInChilds = true;
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        dfs.disableErasureCodingPolicy(ecPolicyName);
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (!isDFSExistsInChilds) {
      throw new UnsupportedOperationException(
          "No DFS available in child file systems.");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
  }

  @Override
  public void unsetErasureCodingPolicy(final Path path) throws IOException {
    if (this.vfs == null) {
      super.unsetErasureCodingPolicy(path);
      return;
    }

    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(path, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "unsetErasureCodingPolicy");
    ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .unsetErasureCodingPolicy(mountPathInfo.getPathOnTarget());
  }

  @Override
  public ECTopologyVerifierResult getECTopologyResultForPolicies(
      final String... policyNames) throws IOException {
    if (this.vfs == null) {
      return super.getECTopologyResultForPolicies(policyNames);
    }

    List<IOException> failedExceptions = new ArrayList<>();
    ECTopologyVerifierResult result = null;
    for (FileSystem fs : getChildFileSystems()) {
      if (!(fs instanceof DistributedFileSystem)) {
        continue;
      }
      DistributedFileSystem dfs = (DistributedFileSystem) fs;
      try {
        result = dfs.getECTopologyResultForPolicies(policyNames);
        if (!result.isSupported()) {
          // whenever we see negative result.
          return result;
        }
      } catch (IOException ioe) {
        failedExceptions.add(ioe);
      }
    }
    if (result == null) {
      throw new UnsupportedOperationException(
          "No DFS available in child filesystems");
    }
    if (failedExceptions.size() > 0) {
      throw MultipleIOException.createIOException(failedExceptions);
    }
    // Let's just return the last one.
    return result;
  }

  @Override
  public Path getTrashRoot(Path path) {
    if (this.vfs == null) {
      return super.getTrashRoot(path);
    }
    return this.vfs.getTrashRoot(path);
  }

  @Override
  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
    if (this.vfs == null) {
      return super.getTrashRoots(allUsers);
    }
    List<FileStatus> trashRoots = new ArrayList<>();
    for (FileSystem fs : getChildFileSystems()) {
      trashRoots.addAll(fs.getTrashRoots(allUsers));
    }
    return trashRoots;
  }

  // Just proovided the same implementation as default in dfs as thats just
  // delegated to FileSystem parent class.
  @Override
  protected Path fixRelativePart(Path p) {
    return super.fixRelativePart(p);
  }

  Statistics getFsStatistics() {
    if (this.vfs == null) {
      return super.getFsStatistics();
    }
    return statistics;
  }

  DFSOpsCountStatistics getDFSOpsCountStatistics() {
    if (this.vfs == null) {
      return super.getDFSOpsCountStatistics();
    }
    return defaultDFS.getDFSOpsCountStatistics();
  }

  @Override
  // Works only for HDFS
  public HdfsDataOutputStreamBuilder createFile(Path path) {
    if (this.vfs == null) {
      return super.createFile(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo = null;
    try {
      mountPathInfo = this.vfs.getMountPathInfo(path, getConf());
    } catch (IOException e) {
      // TODO: can we return null here?
      return null;
    }
    checkDFS(mountPathInfo.getTargetFs(), "createFile");
    return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs()
        .createFile(mountPathInfo.getPathOnTarget());
  }

  @Deprecated
  @Override
  public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
    if (this.vfs == null) {
      return super.listOpenFiles();
    }
    checkDefaultDFS(defaultDFS, "listOpenFiles");
    return defaultDFS.listOpenFiles();
  }

  @Deprecated
  @Override
  public RemoteIterator<OpenFileEntry> listOpenFiles(
      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes)
      throws IOException {
    if (this.vfs == null) {
      return super.listOpenFiles(openFilesTypes);
    }
    checkDefaultDFS(defaultDFS, "listOpenFiles");
    return defaultDFS.listOpenFiles(openFilesTypes);
  }

  @Override
  public RemoteIterator<OpenFileEntry> listOpenFiles(
      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
      throws IOException {
    if (this.vfs == null) {
      return super.listOpenFiles(openFilesTypes, path);
    }
    Path absF = fixRelativePart(new Path(path));
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
        this.vfs.getMountPathInfo(absF, getConf());
    checkDFS(mountPathInfo.getTargetFs(), "listOpenFiles");
    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
        .listOpenFiles(openFilesTypes,
            mountPathInfo.getPathOnTarget().toString());
  }

  @Override
  public HdfsDataOutputStreamBuilder appendFile(Path path) {
    if (this.vfs == null) {
      return super.appendFile(path);
    }
    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo = null;
    try {
      mountPathInfo = this.vfs.getMountPathInfo(path, getConf());
    } catch (IOException e) {
      LOGGER.warn("Failed to resolve the path as mount path", e);
      return null;
    }
    checkDFS(mountPathInfo.getTargetFs(), "appendFile");
    return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs()
        .appendFile(mountPathInfo.getPathOnTarget());
  }

  @Override
  public boolean hasPathCapability(Path path, String capability)
      throws IOException {
    if (this.vfs == null) {
      return super.hasPathCapability(path, capability);
    }
    return this.vfs.hasPathCapability(path, capability);
  }

  //Below API provided implementations are in ViewFS but not there in DFS.
  @Override
  public Path resolvePath(final Path f) throws IOException {
    if (this.vfs == null) {
      return super.resolvePath(f);
    }
    return this.vfs.resolvePath(f);
  }

  @Override
  @SuppressWarnings("deprecation")
  public boolean delete(final Path f)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.delete(f);
    }
    return this.vfs.delete(f);
  }

  @Override
  public FileChecksum getFileChecksum(final Path f, final long length)
      throws AccessControlException, FileNotFoundException, IOException {
    if (this.vfs == null) {
      return super.getFileChecksum(f, length);
    }
    return this.vfs.getFileChecksum(f, length);
  }

  @Override
  public boolean mkdirs(Path dir) throws IOException {
    if (this.vfs == null) {
      return super.mkdirs(dir);
    }
    return this.vfs.mkdirs(dir);
  }

  @Override
  public long getDefaultBlockSize(Path f) {
    if (this.vfs == null) {
      return super.getDefaultBlockSize(f);
    }
    return this.vfs.getDefaultBlockSize(f);
  }

  @Override
  public short getDefaultReplication(Path f) {
    if (this.vfs == null) {
      return super.getDefaultReplication(f);
    }
    return this.vfs.getDefaultReplication(f);
  }

  @Override
  public FsServerDefaults getServerDefaults(Path f) throws IOException {
    if (this.vfs == null) {
      return super.getServerDefaults(f);
    }
    return this.vfs.getServerDefaults(f);
  }

  @Override
  public void setWriteChecksum(final boolean writeChecksum) {
    if (this.vfs == null) {
      super.setWriteChecksum(writeChecksum);
      return;
    }
    this.vfs.setWriteChecksum(writeChecksum);
  }

  @Override
  public FileSystem[] getChildFileSystems() {
    if (this.vfs == null) {
      return super.getChildFileSystems();
    }
    return this.vfs.getChildFileSystems();
  }

  public ViewFileSystem.MountPoint[] getMountPoints() {
    if (this.vfs == null) {
      return null;
    }
    return this.vfs.getMountPoints();
  }

  @Override
  public FsStatus getStatus() throws IOException {
    if (this.vfs == null) {
      return super.getStatus();
    }
    return this.vfs.getStatus();
  }

  @Override
  public long getUsed() throws IOException {
    if (this.vfs == null) {
      return super.getUsed();
    }
    return this.vfs.getUsed();
  }

  @Override
  public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
    if (this.vfs == null) {
      return super.getSlowDatanodeStats();
    }
    checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
    return defaultDFS.getSlowDatanodeStats();
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AddBlockFlag 源码

hadoop BlockMissingException 源码

hadoop BlockReader 源码

hadoop CannotObtainBlockLengthException 源码

hadoop ClientContext 源码

hadoop ClientGSIContext 源码

hadoop DFSClient 源码

hadoop DFSClientFaultInjector 源码

hadoop DFSHedgedReadMetrics 源码

hadoop DFSInotifyEventInputStream 源码

0  赞