hadoop PseudoLocalFs 源码

  • 2022-10-20
  • 浏览 (209)

haddop PseudoLocalFs 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/PseudoLocalFs.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.gridmix;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.net.URI;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.util.Progressable;

/**
 * Pseudo local file system that generates random data for any file on the fly
 * instead of storing files on disk. So opening same file multiple times will
 * not give same file content. There are no directories in this file system
 * other than the root and all the files are under root i.e. "/". All file URIs
 * on pseudo local file system should be of the format <code>
 * pseudo:///&lt;name&gt;.&lt;fileSize&gt;</code> where name is a unique name
 * and &lt;fileSize&gt; is a number representing the size of the file in bytes.
 */
class PseudoLocalFs extends FileSystem {
  Path home;
  /**
   * The creation time and modification time of all files in
   * {@link PseudoLocalFs} is same.
   */
  private static final long TIME = System.currentTimeMillis();
  private static final String HOME_DIR = "/";
  private static final long BLOCK_SIZE  = 4 * 1024 * 1024L; // 4 MB
  private static final int DEFAULT_BUFFER_SIZE = 1024  * 1024; // 1MB

  static final URI NAME = URI.create("pseudo:///");

  PseudoLocalFs() {
    this(new Path(HOME_DIR));
  }

  PseudoLocalFs(Path home) {
    super();
    this.home = home;
  }

  @Override
  public URI getUri() {
    return NAME;
  }

  @Override
  public Path getHomeDirectory() {
    return home;
  }

  @Override
  public Path getWorkingDirectory() {
    return getHomeDirectory();
  }

  /**
   * Generates a valid pseudo local file path from the given <code>fileId</code>
   * and <code>fileSize</code>.
   * @param fileId unique file id string
   * @param fileSize file size
   * @return the generated relative path
   */
  static Path generateFilePath(String fileId, long fileSize) {
    return new Path(fileId + "." + fileSize);
  }

  /**
   * Creating a pseudo local file is nothing but validating the file path.
   * Actual data of the file is generated on the fly when client tries to open
   * the file for reading.
   * @param path file path to be created
   */
  @Override
  public FSDataOutputStream create(Path path) throws IOException {
    try {
      validateFileNameFormat(path);
    } catch (FileNotFoundException e) {
      throw new IOException("File creation failed for " + path);
    }
    return null;
  }

  /**
   * Validate if the path provided is of expected format of Pseudo Local File
   * System based files.
   * @param path file path
   * @return the file size
   * @throws FileNotFoundException
   */
  long validateFileNameFormat(Path path) throws FileNotFoundException {
    path = this.makeQualified(path);
    boolean valid = true;
    long fileSize = 0;
    if (!path.toUri().getScheme().equals(getUri().getScheme())) {
      valid = false;
    } else {
      String[] parts = path.toUri().getPath().split("\\.");
      try {
        fileSize = Long.parseLong(parts[parts.length - 1]);
        valid = (fileSize >= 0);
      } catch (NumberFormatException e) {
        valid = false;
      }
    }
    if (!valid) {
      throw new FileNotFoundException("File " + path
          + " does not exist in pseudo local file system");
    }
    return fileSize;
  }

  /**
   * @See create(Path) for details
   */
  @Override
  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
    long fileSize = validateFileNameFormat(path);
    InputStream in = new RandomInputStream(fileSize, bufferSize);
    return new FSDataInputStream(in);
  }

  /**
   * @See create(Path) for details
   */
  @Override
  public FSDataInputStream open(Path path) throws IOException {
    return open(path, DEFAULT_BUFFER_SIZE);
  }

  @Override
  public FileStatus getFileStatus(Path path) throws IOException {
    long fileSize = validateFileNameFormat(path);
    return new FileStatus(fileSize, false, 1, BLOCK_SIZE, TIME, path);
  }

  @Override
  public boolean exists(Path path) {
    try{
      validateFileNameFormat(path);
    } catch (FileNotFoundException e) {
      return false;
    }
    return true;
  }

  @Override
  public FSDataOutputStream create(Path path, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    return create(path);
  }

  @Override
  public FileStatus[] listStatus(Path path) throws FileNotFoundException,
      IOException {
    return new FileStatus[] {getFileStatus(path)};
  }

  /**
   * Input Stream that generates specified number of random bytes.
   */
  static class RandomInputStream extends InputStream
      implements Seekable, PositionedReadable {

    private final Random r = new Random();
    private BytesWritable val = null;
    private int positionInVal = 0;// current position in the buffer 'val'

    private long totalSize = 0;// total number of random bytes to be generated
    private long curPos = 0;// current position in this stream

    /**
     * @param size total number of random bytes to be generated in this stream
     * @param bufferSize the buffer size. An internal buffer array of length
     * <code>bufferSize</code> is created. If <code>bufferSize</code> is not a
     * positive number, then a default value of 1MB is used.
     */
    RandomInputStream(long size, int bufferSize) {
      totalSize = size;
      if (bufferSize <= 0) {
        bufferSize = DEFAULT_BUFFER_SIZE;
      }
      val = new BytesWritable(new byte[bufferSize]);
    }

    @Override
    public int read() throws IOException {
      byte[] b = new byte[1];
      if (curPos < totalSize) {
        if (positionInVal < val.getLength()) {// use buffered byte
          b[0] = val.getBytes()[positionInVal++];
          ++curPos;
        } else {// generate data
          int num = read(b);
          if (num < 0) {
            return num;
          }
        }
      } else {
        return -1;
      }
      return b[0];
    }

    @Override
    public int read(byte[] bytes) throws IOException {
      return read(bytes, 0, bytes.length);
    }

    @Override
    public int read(byte[] bytes, int off, int len) throws IOException {
      if (curPos == totalSize) {
        return -1;// EOF
      }
      int numBytes = len;
      if (numBytes > (totalSize - curPos)) {// position in file is close to EOF
        numBytes = (int)(totalSize - curPos);
      }
      if (numBytes > (val.getLength() - positionInVal)) {
        // need to generate data into val
        r.nextBytes(val.getBytes());
        positionInVal = 0;
      }

      System.arraycopy(val.getBytes(), positionInVal, bytes, off, numBytes);
      curPos += numBytes;
      positionInVal += numBytes;
      return numBytes;
    }

    @Override
    public int available() {
      return (int)(val.getLength() - positionInVal);
    }

    @Override
    public int read(long position, byte[] buffer, int offset, int length)
        throws IOException {
      throw new UnsupportedOperationException();
    }

    @Override
    public void readFully(long position, byte[] buffer) throws IOException {
      throw new UnsupportedOperationException();
    }

    @Override
    public void readFully(long position, byte[] buffer, int offset, int length)
        throws IOException {
      throw new UnsupportedOperationException();
    }

    /**
     * Get the current position in this stream/pseudo-file
     * @return the position in this stream/pseudo-file
     * @throws IOException
     */
    @Override
    public long getPos() throws IOException {
      return curPos;
    }

    @Override
    public void seek(long pos) throws IOException {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean seekToNewSource(long targetPos) throws IOException {
      throw new UnsupportedOperationException();
    }
  }

  @Override
  public FSDataOutputStream append(Path path, int bufferSize,
      Progressable progress) throws IOException {
    throw new UnsupportedOperationException("Append is not supported"
        + " in pseudo local file system.");
  }

  @Override
  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    throw new UnsupportedOperationException("Mkdirs is not supported"
        + " in pseudo local file system.");
  }

  @Override
  public boolean rename(Path src, Path dst) throws IOException {
    throw new UnsupportedOperationException("Rename is not supported"
        + " in pseudo local file system.");
  }

  @Override
  public boolean delete(Path path, boolean recursive) {
    throw new UnsupportedOperationException("File deletion is not supported "
        + "in pseudo local file system.");
  }

  @Override
  public void setWorkingDirectory(Path newDir) {
    throw new UnsupportedOperationException("SetWorkingDirectory "
        + "is not supported in pseudo local file system.");
  }

  @Override
  public Path makeQualified(Path path) {
    // skip FileSystem#checkPath() to validate some other Filesystems
    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateData 源码

hadoop GenerateDistCacheData 源码

0  赞