hadoop ViewFileSystemOverloadScheme 源码

  • 2022-10-20
  • 浏览 (287)

haddop ViewFileSystemOverloadScheme 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.viewfs;

import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_MOUNTTABLE_LOADER_IMPL;
import static org.apache.hadoop.fs.viewfs.Constants.DEFAULT_MOUNT_TABLE_CONFIG_LOADER_IMPL;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * <p> This class is extended from the ViewFileSystem for the overloaded
 * scheme file system. Mount link configurations and in-memory mount table
 * building behaviors are inherited from ViewFileSystem. Unlike
 * ViewFileSystem scheme (viewfs://), the users would be able to use
 * any scheme. </p>
 *
 * <p> To use this class, the following configurations need to be added in
 * core-site.xml file. <br>
 * 1) fs.{@literal <scheme>}.impl
 *    = org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme <br>
 * 2) fs.viewfs.overload.scheme.target.{@literal <scheme>}.impl
 *    = {@literal <hadoop compatible file system implementation class name
 *    for the <scheme>>} </p>
 *
 * <p> Here {@literal <scheme>} can be any scheme, but with that scheme there
 * should be a hadoop compatible file system available. Second configuration
 * value should be the respective scheme's file system implementation class.
 * Example: if scheme is configured with "hdfs", then the 2nd configuration
 * class name will be org.apache.hadoop.hdfs.DistributedFileSystem.
 * if scheme is configured with "s3a", then the 2nd configuration class name
 * will be org.apache.hadoop.fs.s3a.S3AFileSystem. </p>
 *
 * <p> Use Case 1: <br>
 * =========== <br>
 * If users want some of their existing cluster (hdfs://Cluster)
 * data to mount with other hdfs and object store clusters(hdfs://NN1,
 * o3fs://bucket1.volume1/, s3a://bucket1/) </p>
 *
 * <p>
 * fs.viewfs.mounttable.Cluster.link./user = hdfs://NN1/user <br>
 * fs.viewfs.mounttable.Cluster.link./data = o3fs://bucket1.volume1/data <br>
 * fs.viewfs.mounttable.Cluster.link./backup = s3a://bucket1/backup/
 * </p>
 *
 * <p>
 * Op1: Create file hdfs://Cluster/user/fileA will go to hdfs://NN1/user/fileA
 * <br>
 * Op2: Create file hdfs://Cluster/data/datafile will go to
 *      o3fs://bucket1.volume1/data/datafile<br>
 * Op3: Create file hdfs://Cluster/backup/data.zip will go to
 *      s3a://bucket1/backup/data.zip
 * </p>
 *
 * <p> Use Case 2:<br>
 * ===========<br>
 * If users want some of their existing cluster (s3a://bucketA/)
 * data to mount with other hdfs and object store clusters
 * (hdfs://NN1, o3fs://bucket1.volume1/) </p>
 *
 * <p>
 * fs.viewfs.mounttable.bucketA.link./user = hdfs://NN1/user<br>
 * fs.viewfs.mounttable.bucketA.link./data = o3fs://bucket1.volume1/data<br>
 * fs.viewfs.mounttable.bucketA.link./salesDB = s3a://bucketA/salesDB/
 * </p>
 *
 * <p>
 * Op1: Create file s3a://bucketA/user/fileA will go to hdfs://NN1/user/fileA
 * <br>
 * Op2: Create file s3a://bucketA/data/datafile will go to
 *      o3fs://bucket1.volume1/data/datafile<br>
 * Op3: Create file s3a://bucketA/salesDB/dbfile will go to
 *      s3a://bucketA/salesDB/dbfile
 * </p>
 *
 * <p> Note:<br>
 * (1) In ViewFileSystemOverloadScheme, by default the mount links will be
 * represented as non-symlinks. If you want to change this behavior, please see
 * {@link ViewFileSystem#listStatus(Path)}<br>
 * (2) In ViewFileSystemOverloadScheme, only the initialized uri's hostname will
 * be considered as the mount table name. When the passed uri has hostname:port,
 * it will simply ignore the port number and only hostname will be considered as
 * the mount table name.<br>
 * (3) If there are no mount links configured with the initializing uri's
 * hostname as the mount table name, then it will automatically consider the
 * current uri as fallback( ex:
 * {@literal fs.viewfs.mounttable.<mycluster>.linkFallback}) target fs uri.
 * </p>
 */
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase", "Hive" })
@InterfaceStability.Evolving
public class ViewFileSystemOverloadScheme extends ViewFileSystem {
  private URI myUri;
  private boolean supportAutoAddingFallbackOnNoMounts = true;
  public ViewFileSystemOverloadScheme() throws IOException {
    super();
  }

  @Override
  public String getScheme() {
    return myUri.getScheme();
  }

  /**
   * By default returns false as ViewFileSystemOverloadScheme supports auto
   * adding fallback on no mounts.
   */
  public boolean supportAutoAddingFallbackOnNoMounts() {
    return this.supportAutoAddingFallbackOnNoMounts;
  }

  /**
   * Sets whether to add fallback automatically when no mount points found.
   *
   * @param addAutoFallbackOnNoMounts addAutoFallbackOnNoMounts.
   */
  public void setSupportAutoAddingFallbackOnNoMounts(
      boolean addAutoFallbackOnNoMounts) {
    this.supportAutoAddingFallbackOnNoMounts = addAutoFallbackOnNoMounts;
  }

  @Override
  public void initialize(URI theUri, Configuration conf) throws IOException {
    this.myUri = theUri;
    if (LOG.isDebugEnabled()) {
      LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: "
          + theUri);
    }
    String mountTableConfigPath =
        conf.get(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH);
    /* The default value to false in ViewFSOverloadScheme */
    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS,
        conf.getBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS,
            false));
    /* the default value to true in ViewFSOverloadScheme */
    conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
        conf.getBoolean(Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
            true));
    if (null != mountTableConfigPath) {
      MountTableConfigLoader loader = getMountTableConfigLoader(conf);
      loader.load(mountTableConfigPath, conf);
    } else {
      // TODO: Should we fail here.?
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Missing configuration for fs.viewfs.mounttable.path. Proceeding"
                + "with core-site.xml mount-table information if avaialable.");
      }
    }
    super.initialize(theUri, conf);
  }

  private MountTableConfigLoader getMountTableConfigLoader(
      final Configuration conf) {
    Class<? extends MountTableConfigLoader> clazz =
        conf.getClass(CONFIG_VIEWFS_MOUNTTABLE_LOADER_IMPL,
            DEFAULT_MOUNT_TABLE_CONFIG_LOADER_IMPL,
                MountTableConfigLoader.class);

    if (clazz == null) {
      throw new RuntimeException(
          String.format("Errors on getting mount table loader class. "
              + "The fs.viewfs.mounttable.config.loader.impl conf is %s. ",
                  conf.get(CONFIG_VIEWFS_MOUNTTABLE_LOADER_IMPL,
                      DEFAULT_MOUNT_TABLE_CONFIG_LOADER_IMPL.getName())));
    }

    try {
      MountTableConfigLoader mountTableConfigLoader =
          ReflectionUtils.newInstance(clazz, conf);
      return mountTableConfigLoader;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * This method is overridden because in ViewFileSystemOverloadScheme if
   * overloaded scheme matches with mounted target fs scheme, file system
   * should be created without going into {@literal fs.<scheme>.impl} based
   * resolution. Otherwise it will end up in an infinite loop as the target
   * will be resolved again to ViewFileSystemOverloadScheme as
   * {@literal fs.<scheme>.impl} points to ViewFileSystemOverloadScheme.
   * So, below method will initialize the
   * {@literal fs.viewfs.overload.scheme.target.<scheme>.impl}.
   * Other schemes can follow fs.newInstance
   */
  @Override
  protected FsGetter fsGetter() {
    return new ChildFsGetter(getScheme());
  }

  /**
   * This class checks whether the rooScheme is same as URI scheme. If both are
   * same, then it will initialize file systems by using the configured
   * {@literal fs.viewfs.overload.scheme.target.<scheme>.impl} class.
   */
  static class ChildFsGetter extends FsGetter {

    private final String rootScheme;

    ChildFsGetter(String rootScheme) {
      this.rootScheme = rootScheme;
    }

    @Override
    public FileSystem getNewInstance(URI uri, Configuration conf)
        throws IOException {
      if (uri.getScheme().equals(this.rootScheme)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "The file system initialized uri scheme is matching with the "
                  + "given target uri scheme. The target uri is: " + uri);
        }
        /*
         * Avoid looping when target fs scheme is matching to overloaded scheme.
         */
        return createFileSystem(uri, conf);
      } else {
        return FileSystem.newInstance(uri, conf);
      }
    }

    /**
     * When ViewFileSystemOverloadScheme scheme and target uri scheme are
     * matching, it will not take advantage of FileSystem cache as it will
     * create instance directly. For caching needs please set
     * "fs.viewfs.enable.inner.cache" to true.
     */
    @Override
    public FileSystem get(URI uri, Configuration conf) throws IOException {
      if (uri.getScheme().equals(this.rootScheme)) {
        // Avoid looping when target fs scheme is matching to overloaded
        // scheme.
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "The file system initialized uri scheme is matching with the "
                  + "given target uri scheme. So, the target file system "
                  + "instances will not be cached. To cache fs instances, "
                  + "please set fs.viewfs.enable.inner.cache to true. "
                  + "The target uri is: " + uri);
        }
        return createFileSystem(uri, conf);
      } else {
        return FileSystem.get(uri, conf);
      }
    }

    private FileSystem createFileSystem(URI uri, Configuration conf)
        throws IOException {
      final String fsImplConf = String.format(
          FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
          uri.getScheme());
      Class<?> clazz = conf.getClass(fsImplConf, null);
      if (clazz == null) {
        throw new UnsupportedFileSystemException(
            String.format("%s=null: %s: %s", fsImplConf,
                "No overload scheme fs configured", uri.getScheme()));
      }
      FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
      fs.initialize(uri, conf);
      return fs;
    }

    private <T> T newInstance(Class<T> theClass, URI uri, Configuration conf) {
      T result;
      try {
        Constructor<T> meth = theClass.getConstructor();
        meth.setAccessible(true);
        result = meth.newInstance();
      } catch (InvocationTargetException e) {
        Throwable cause = e.getCause();
        if (cause instanceof RuntimeException) {
          throw (RuntimeException) cause;
        } else {
          throw new RuntimeException(cause);
        }
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
      return result;
    }

  }

  /**
   * This is an admin only API to give access to its child raw file system, if
   * the path is link. If the given path is an internal directory(path is from
   * mount paths tree), it will initialize the file system of given path uri
   * directly. If path cannot be resolved to any internal directory or link, it
   * will throw NotInMountpointException. Please note, this API will not return
   * chrooted file system. Instead, this API will get actual raw file system
   * instances.
   *
   * @param path - fs uri path
   * @param conf - configuration
   * @throws IOException raised on errors performing I/O.
   * @return file system.
   */
  public FileSystem getRawFileSystem(Path path, Configuration conf)
      throws IOException {
    InodeTree.ResolveResult<FileSystem> res;
    try {
      res = fsState.resolve(getUriPath(path), true);
      return res.isInternalDir() ? fsGetter().get(path.toUri(), conf)
          : ((ChRootedFileSystem) res.targetFileSystem).getMyFs();
    } catch (FileNotFoundException e) {
      // No link configured with passed path.
      throw new NotInMountpointException(path,
          "No link found for the given path.");
    }
  }

  /**
   * Gets the mount path info, which contains the target file system and
   * remaining path to pass to the target file system.
   *
   * @param path the path.
   * @param conf configuration.
   * @return mount path info.
   * @throws IOException raised on errors performing I/O.
   */
  public MountPathInfo<FileSystem> getMountPathInfo(Path path,
      Configuration conf) throws IOException {
    InodeTree.ResolveResult<FileSystem> res;
    try {
      res = fsState.resolve(getUriPath(path), true);
      FileSystem fs = res.isInternalDir() ?
          (fsState.getRootFallbackLink() != null ?
              fsState.getRootFallbackLink().getTargetFileSystem() :
              fsGetter().get(path.toUri(), conf)) :
          res.targetFileSystem;
      if (fs instanceof ChRootedFileSystem) {
        ChRootedFileSystem chFs = (ChRootedFileSystem) fs;
        return new MountPathInfo<>(chFs.fullPath(res.remainingPath),
            chFs.getMyFs());
      }
      return new MountPathInfo<FileSystem>(res.remainingPath, fs);
    } catch (FileNotFoundException e) {
      // No link configured with passed path.
      throw new NotInMountpointException(path,
          "No link found for the given path.");
    }
  }

  /**
   * A class to maintain the target file system and a path to pass to the target
   * file system.
   */
  public static class MountPathInfo<T> {
    private Path pathOnTarget;
    private T targetFs;

    public MountPathInfo(Path pathOnTarget, T targetFs) {
      this.pathOnTarget = pathOnTarget;
      this.targetFs = targetFs;
    }

    public Path getPathOnTarget() {
      return this.pathOnTarget;
    }

    public T getTargetFs() {
      return this.targetFs;
    }
  }

  /**
   * @return Gets the fallback file system configured. Usually, this will be the
   * default cluster.
   */
  public FileSystem getFallbackFileSystem() {
    if (fsState.getRootFallbackLink() == null) {
      return null;
    }
    try {
      return ((ChRootedFileSystem) fsState.getRootFallbackLink()
          .getTargetFileSystem()).getMyFs();
    } catch (IOException ex) {
      LOG.error("Could not get fallback filesystem ");
    }
    return null;
  }

  @Override
  @InterfaceAudience.LimitedPrivate("HDFS")
  public URI canonicalizeUri(URI uri) {
    return super.canonicalizeUri(uri);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ChRootedFileSystem 源码

hadoop ChRootedFs 源码

hadoop ConfigUtil 源码

hadoop Constants 源码

hadoop FsGetter 源码

hadoop HCFSMountTableConfigLoader 源码

hadoop InodeTree 源码

hadoop MountTableConfigLoader 源码

hadoop NflyFSystem 源码

hadoop NotInMountpointException 源码

0  赞