hadoop ImageWriter 源码

  • 2022-10-20
  • 浏览 (183)

haddop ImageWriter 代码

文件路径:/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageWriter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection.DirEntry;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.thirdparty.protobuf.GeneratedMessageV3;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.hdfs.server.namenode.FSImageUtil.MAGIC_HEADER;

/**
 * Utility crawling an existing hierarchical FileSystem and emitting
 * a valid FSImage/NN storage.
 */
// TODO: generalize to types beyond FileRegion
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ImageWriter implements Closeable {

  private static final int ONDISK_VERSION = 1;
  private static final int LAYOUT_VERSION =
      NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;

  private final Path outdir;
  private final FileSystem outfs;
  private final File dirsTmp;
  private final OutputStream dirs;
  private final File inodesTmp;
  private final OutputStream inodes;
  private final MessageDigest digest;
  private final FSImageCompression compress;
  private final long startBlock;
  private final long startInode;
  private final UGIResolver ugis;
  private final BlockAliasMap.Writer<FileRegion> blocks;
  private final BlockResolver blockIds;
  private final Map<Long, DirEntry.Builder> dircache;
  private final TrackedOutputStream<DigestOutputStream> raw;

  private boolean closed = false;
  private long curSec;
  private long curBlock;
  private final AtomicLong curInode;
  private final FileSummary.Builder summary = FileSummary.newBuilder()
      .setOndiskVersion(ONDISK_VERSION)
      .setLayoutVersion(LAYOUT_VERSION);

  private final String blockPoolID;

  public static Options defaults() {
    return new Options();
  }

  @SuppressWarnings("unchecked")
  public ImageWriter(Options opts) throws IOException {
    final OutputStream out;
    if (null == opts.outStream) {
      FileSystem fs = opts.outdir.getFileSystem(opts.getConf());
      outfs = (fs instanceof LocalFileSystem)
          ? ((LocalFileSystem)fs).getRaw()
          : fs;
      Path tmp = opts.outdir;
      if (!outfs.mkdirs(tmp)) {
        throw new IOException("Failed to create output dir: " + tmp);
      }
      try (NNStorage stor = new NNStorage(opts.getConf(),
          Arrays.asList(tmp.toUri()), Arrays.asList(tmp.toUri()))) {
        NamespaceInfo info = NNStorage.newNamespaceInfo();
        if (info.getLayoutVersion() != LAYOUT_VERSION) {
          throw new IllegalStateException("Incompatible layout " +
              info.getLayoutVersion() + " (expected " + LAYOUT_VERSION + ")");
        }
        // set the cluster id, if given
        if (opts.clusterID.length() > 0) {
          info.setClusterID(opts.clusterID);
        }
        // if block pool id is given
        if (opts.blockPoolID.length() > 0) {
          info.setBlockPoolID(opts.blockPoolID);
        }

        stor.format(info);
        blockPoolID = info.getBlockPoolID();
      }
      outdir = new Path(tmp, "current");
      out = outfs.create(new Path(outdir, "fsimage_0000000000000000000"));
    } else {
      outdir = null;
      outfs = null;
      out = opts.outStream;
      blockPoolID = "";
    }
    digest = MD5Hash.getDigester();
    raw = new TrackedOutputStream<>(new DigestOutputStream(
            new BufferedOutputStream(out), digest));
    compress = opts.compress;
    CompressionCodec codec = compress.getImageCodec();
    if (codec != null) {
      summary.setCodec(codec.getClass().getCanonicalName());
    }
    startBlock = opts.startBlock;
    curBlock = startBlock;
    startInode = opts.startInode;
    curInode = new AtomicLong(startInode);
    dircache = Collections.synchronizedMap(new DirEntryCache(opts.maxdircache));

    ugis = null == opts.ugis
        ? ReflectionUtils.newInstance(opts.ugisClass, opts.getConf())
        : opts.ugis;
    BlockAliasMap<FileRegion> fmt = null == opts.blocks
        ? ReflectionUtils.newInstance(opts.aliasMap, opts.getConf())
        : opts.blocks;
    blocks = fmt.getWriter(null, blockPoolID);
    blockIds = null == opts.blockIds
        ? ReflectionUtils.newInstance(opts.blockIdsClass, opts.getConf())
        : opts.blockIds;

    // create directory and inode sections as side-files.
    // The details are written to files to avoid keeping them in memory.
    FileOutputStream dirsTmpStream = null;
    try {
      dirsTmp = File.createTempFile("fsimg_dir", null);
      dirsTmp.deleteOnExit();
      dirsTmpStream = new FileOutputStream(dirsTmp);
      dirs = beginSection(dirsTmpStream);
    } catch (IOException e) {
      IOUtils.cleanupWithLogger(null, raw, dirsTmpStream);
      throw e;
    }

    try {
      inodesTmp = File.createTempFile("fsimg_inode", null);
      inodesTmp.deleteOnExit();
      inodes = new FileOutputStream(inodesTmp);
    } catch (IOException e) {
      IOUtils.cleanupWithLogger(null, raw, dirsTmpStream, dirs);
      throw e;
    }

    raw.write(MAGIC_HEADER);
    curSec = raw.pos;
    assert raw.pos == MAGIC_HEADER.length;
  }

  public void accept(TreePath e) throws IOException {
    assert e.getParentId() < curInode.get();
    // allocate ID
    long id = curInode.getAndIncrement();
    e.accept(id);
    assert e.getId() < curInode.get();
    INode n = e.toINode(ugis, blockIds, blocks);
    writeInode(n);

    if (e.getParentId() > 0) {
      // add DirEntry to map, which may page out entries
      DirEntry.Builder de = DirEntry.newBuilder()
          .setParent(e.getParentId())
          .addChildren(e.getId());
      dircache.put(e.getParentId(), de);
    }
  }

  @SuppressWarnings("serial")
  class DirEntryCache extends LinkedHashMap<Long, DirEntry.Builder> {

    // should cache path to root, not evict LRCached
    private final int nEntries;

    DirEntryCache(int nEntries) {
      this.nEntries = nEntries;
    }

    @Override
    public DirEntry.Builder put(Long p, DirEntry.Builder b) {
      DirEntry.Builder e = get(p);
      if (null == e) {
        return super.put(p, b);
      }
      // merge
      e.addAllChildren(b.getChildrenList());
      // not strictly conforming
      return e;
    }

    @Override
    protected boolean removeEldestEntry(Entry<Long, DirEntry.Builder> be) {
      if (size() > nEntries) {
        DirEntry d = be.getValue().build();
        try {
          writeDirEntry(d);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        return true;
      }
      return false;
    }
  }

  synchronized void writeInode(INode n) throws IOException {
    n.writeDelimitedTo(inodes);
  }

  synchronized void writeDirEntry(DirEntry e) throws IOException {
    e.writeDelimitedTo(dirs);
  }

  private static int getOndiskSize(GeneratedMessageV3 s) {
    return CodedOutputStream.computeUInt32SizeNoTag(s.getSerializedSize())
        + s.getSerializedSize();
  }

  @Override
  public synchronized void close() throws IOException {
    if (closed) {
      return;
    }
    for (DirEntry.Builder b : dircache.values()) {
      DirEntry e = b.build();
      writeDirEntry(e);
    }
    dircache.clear();

    // close side files
    IOUtils.cleanupWithLogger(null, dirs, inodes, blocks);
    if (null == dirs || null == inodes) {
      // init failed
      if (raw != null) {
        raw.close();
      }
      return;
    }
    try {
      writeNameSystemSection();
      writeINodeSection();
      writeDirSection();
      writeStringTableSection();

      // write summary directly to raw
      FileSummary s = summary.build();
      s.writeDelimitedTo(raw);
      int length = getOndiskSize(s);
      byte[] lengthBytes = new byte[4];
      ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
      raw.write(lengthBytes);
    } finally {
      raw.close();
    }
    writeMD5("fsimage_0000000000000000000");
    closed = true;
  }

  /**
   * Write checksum for image file. Pulled from MD5Utils/internals. Awkward to
   * reuse existing tools/utils.
   */
  void writeMD5(String imagename) throws IOException {
    if (null == outdir) {
      return;
    }
    MD5Hash md5 = new MD5Hash(digest.digest());
    String digestString = StringUtils.byteToHexString(md5.getDigest());
    Path chk = new Path(outdir, imagename + ".md5");
    try (OutputStream out = outfs.create(chk)) {
      String md5Line = digestString + " *" + imagename + "\n";
      out.write(md5Line.getBytes(Charsets.UTF_8));
    }
  }

  OutputStream beginSection(OutputStream out) throws IOException {
    CompressionCodec codec = compress.getImageCodec();
    if (null == codec) {
      return out;
    }
    return codec.createOutputStream(out);
  }

  void endSection(OutputStream out, SectionName name) throws IOException {
    CompressionCodec codec = compress.getImageCodec();
    if (codec != null) {
      ((CompressorStream)out).finish();
    }
    out.flush();
    long length = raw.pos - curSec;
    summary.addSections(FileSummary.Section.newBuilder()
        .setName(name.toString()) // not strictly correct, but name not visible
        .setOffset(curSec).setLength(length));
    curSec += length;
  }

  void writeNameSystemSection() throws IOException {
    NameSystemSection.Builder b = NameSystemSection.newBuilder()
        .setGenstampV1(1000)
        .setGenstampV1Limit(0)
        .setGenstampV2(1001)
        .setLastAllocatedBlockId(blockIds.lastId())
        .setTransactionId(0);
    NameSystemSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    endSection(sec, SectionName.NS_INFO);
  }

  void writeINodeSection() throws IOException {
    // could reset dict to avoid compression cost in close
    INodeSection.Builder b = INodeSection.newBuilder()
        .setNumInodes(curInode.get() - startInode)
        .setLastInodeId(curInode.get());
    INodeSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    // copy inodes
    try (FileInputStream in = new FileInputStream(inodesTmp)) {
      IOUtils.copyBytes(in, sec, 4096, false);
    }
    endSection(sec, SectionName.INODE);
  }

  void writeDirSection() throws IOException {
    // No header, so dirs can be written/compressed independently
    OutputStream sec = raw;
    // copy dirs
    try (FileInputStream in = new FileInputStream(dirsTmp)) {
      IOUtils.copyBytes(in, sec, 4096, false);
    }
    endSection(sec, SectionName.INODE_DIR);
  }

  void writeFilesUCSection() throws IOException {
    FilesUnderConstructionSection.Builder b =
        FilesUnderConstructionSection.newBuilder();
    FilesUnderConstructionSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    endSection(sec, SectionName.FILES_UNDERCONSTRUCTION);
  }

  void writeSnapshotDiffSection() throws IOException {
    SnapshotDiffSection.Builder b = SnapshotDiffSection.newBuilder();
    SnapshotDiffSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    endSection(sec, SectionName.SNAPSHOT_DIFF);
  }

  void writeSecretManagerSection() throws IOException {
    SecretManagerSection.Builder b = SecretManagerSection.newBuilder()
        .setCurrentId(0)
        .setTokenSequenceNumber(0);
    SecretManagerSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    endSection(sec, SectionName.SECRET_MANAGER);
  }

  void writeCacheManagerSection() throws IOException {
    CacheManagerSection.Builder b = CacheManagerSection.newBuilder()
        .setNumPools(0)
        .setNumDirectives(0)
        .setNextDirectiveId(1);
    CacheManagerSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    endSection(sec, SectionName.CACHE_MANAGER);
  }

  void writeStringTableSection() throws IOException {
    StringTableSection.Builder b = StringTableSection.newBuilder();
    Map<Integer, String> u = ugis.ugiMap();
    b.setNumEntry(u.size());
    StringTableSection s = b.build();

    OutputStream sec = beginSection(raw);
    s.writeDelimitedTo(sec);
    for (Map.Entry<Integer, String> e : u.entrySet()) {
      StringTableSection.Entry.Builder x =
          StringTableSection.Entry.newBuilder()
              .setId(e.getKey())
              .setStr(e.getValue());
      x.build().writeDelimitedTo(sec);
    }
    endSection(sec, SectionName.STRING_TABLE);
  }

  @Override
  public synchronized String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append("{ codec=\"").append(compress.getImageCodec());
    sb.append("\", startBlock=").append(startBlock);
    sb.append(", curBlock=").append(curBlock);
    sb.append(", startInode=").append(startInode);
    sb.append(", curInode=").append(curInode);
    sb.append(", ugi=").append(ugis);
    sb.append(", blockIds=").append(blockIds);
    sb.append(", offset=").append(raw.pos);
    sb.append(" }");
    return sb.toString();
  }

  static class TrackedOutputStream<T extends OutputStream>
      extends FilterOutputStream {

    private long pos = 0L;

    TrackedOutputStream(T out) {
      super(out);
    }

    @SuppressWarnings("unchecked")
    public T getInner() {
      return (T) out;
    }

    @Override
    public void write(int b) throws IOException {
      out.write(b);
      ++pos;
    }

    @Override
    public void write(byte[] b) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      out.write(b, off, len);
      pos += len;
    }

    @Override
    public void flush() throws IOException {
      super.flush();
    }

    @Override
    public void close() throws IOException {
      super.close();
    }

  }

  /**
   * Configurable options for image generation mapping pluggable components.
   */
  public static class Options implements Configurable {

    public static final String START_INODE = "hdfs.image.writer.start.inode";
    public static final String CACHE_ENTRY = "hdfs.image.writer.cache.entries";
    public static final String UGI_CLASS   = "hdfs.image.writer.ugi.class";
    public static final String BLOCK_RESOLVER_CLASS =
        "hdfs.image.writer.blockresolver.class";

    private Path outdir;
    private Configuration conf;
    private OutputStream outStream;
    private int maxdircache;
    private long startBlock;
    private long startInode;
    private UGIResolver ugis;
    private Class<? extends UGIResolver> ugisClass;
    private BlockAliasMap<FileRegion> blocks;
    private String clusterID;
    private String blockPoolID;

    @SuppressWarnings("rawtypes")
    private Class<? extends BlockAliasMap> aliasMap;
    private BlockResolver blockIds;
    private Class<? extends BlockResolver> blockIdsClass;
    private FSImageCompression compress =
        FSImageCompression.createNoopCompression();

    protected Options() {
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;
      String def = new File("hdfs/name").toURI().toString();
      outdir = new Path(conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, def));
      startBlock = conf.getLong(FixedBlockResolver.START_BLOCK, (1L << 30) + 1);
      startInode = conf.getLong(START_INODE, (1L << 14) + 1);
      maxdircache = conf.getInt(CACHE_ENTRY, 100);
      ugisClass = conf.getClass(UGI_CLASS,
          SingleUGIResolver.class, UGIResolver.class);
      aliasMap = conf.getClass(
          DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
          NullBlockAliasMap.class, BlockAliasMap.class);
      blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS,
          FixedBlockResolver.class, BlockResolver.class);
      clusterID = "";
      blockPoolID = "";
    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    public Options output(String out) {
      this.outdir = new Path(out);
      return this;
    }

    public Options outStream(OutputStream outStream) {
      this.outStream = outStream;
      return this;
    }

    public Options codec(String codec) throws IOException {
      this.compress = FSImageCompression.createCompression(getConf(), codec);
      return this;
    }

    public Options cache(int nDirEntries) {
      this.maxdircache = nDirEntries;
      return this;
    }

    public Options ugi(UGIResolver ugis) {
      this.ugis = ugis;
      return this;
    }

    public Options ugi(Class<? extends UGIResolver> ugisClass) {
      this.ugisClass = ugisClass;
      return this;
    }

    public Options blockIds(BlockResolver blockIds) {
      this.blockIds = blockIds;
      return this;
    }

    public Options blockIds(Class<? extends BlockResolver> blockIdsClass) {
      this.blockIdsClass = blockIdsClass;
      return this;
    }

    public Options blocks(BlockAliasMap<FileRegion> blocks) {
      this.blocks = blocks;
      return this;
    }

    @SuppressWarnings("rawtypes")
    public Options blocks(Class<? extends BlockAliasMap> blocksClass) {
      this.aliasMap = blocksClass;
      return this;
    }

    public Options clusterID(String clusterID) {
      this.clusterID = clusterID;
      return this;
    }

    public Options blockPoolID(String blockPoolID) {
      this.blockPoolID = blockPoolID;
      return this;
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop BlockResolver 源码

hadoop FSTreeWalk 源码

hadoop FileSystemImage 源码

hadoop FixedBlockMultiReplicaResolver 源码

hadoop FixedBlockResolver 源码

hadoop FsUGIResolver 源码

hadoop NullBlockAliasMap 源码

hadoop SingleUGIResolver 源码

hadoop TreePath 源码

hadoop TreeWalk 源码

0  赞