hadoop FSImageFormatProtobuf 源码

  • 2022-10-20
  • 浏览 (180)

haddop FSImageFormatProtobuf 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.util.Time.monotonicNow;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.ErasureCodingSection;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.LimitInputStream;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Lists;

import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;

/**
 * Utility class to read / write fsimage in protobuf format.
 */
@InterfaceAudience.Private
public final class FSImageFormatProtobuf {
  private static final Logger LOG = LoggerFactory
      .getLogger(FSImageFormatProtobuf.class);

  private static volatile boolean enableParallelLoad = false;

  public static final class LoaderContext {
    private SerialNumberManager.StringTable stringTable;
    private final ArrayList<INodeReference> refList = Lists.newArrayList();

    public SerialNumberManager.StringTable getStringTable() {
      return stringTable;
    }

    public ArrayList<INodeReference> getRefList() {
      return refList;
    }
  }

  public static final class SaverContext {
    public static class DeduplicationMap<E> {
      private final Map<E, Integer> map = Maps.newHashMap();
      private DeduplicationMap() {}

      static <T> DeduplicationMap<T> newMap() {
        return new DeduplicationMap<T>();
      }

      int getId(E value) {
        if (value == null) {
          return 0;
        }
        Integer v = map.get(value);
        if (v == null) {
          int nv = map.size() + 1;
          map.put(value, nv);
          return nv;
        }
        return v;
      }

      int size() {
        return map.size();
      }

      Set<Entry<E, Integer>> entrySet() {
        return map.entrySet();
      }
    }
    private final ArrayList<INodeReference> refList = Lists.newArrayList();

    public ArrayList<INodeReference> getRefList() {
      return refList;
    }
  }

  public static final class Loader implements FSImageFormat.AbstractLoader {
    static final int MINIMUM_FILE_LENGTH = 8;
    private final Configuration conf;
    private final FSNamesystem fsn;
    private final LoaderContext ctx;
    /** The MD5 sum of the loaded file */
    private MD5Hash imgDigest;
    /** The transaction ID of the last edit represented by the loaded file */
    private long imgTxId;
    /**
     * Whether the image's layout version must be the same with
     * {@link HdfsServerConstants#NAMENODE_LAYOUT_VERSION}. This is only set to true
     * when we're doing (rollingUpgrade rollback).
     */
    private final boolean requireSameLayoutVersion;

    private File filename;

    Loader(Configuration conf, FSNamesystem fsn,
        boolean requireSameLayoutVersion) {
      this.conf = conf;
      this.fsn = fsn;
      this.ctx = new LoaderContext();
      this.requireSameLayoutVersion = requireSameLayoutVersion;
    }

    @Override
    public MD5Hash getLoadedImageMd5() {
      return imgDigest;
    }

    @Override
    public long getLoadedImageTxId() {
      return imgTxId;
    }

    public LoaderContext getLoaderContext() {
      return ctx;
    }

    /**
     * Thread to compute the MD5 of a file as this can be in parallel while
     * loading the image without interfering much.
     */
    private static class DigestThread extends Thread {

      /**
       * Exception thrown when computing the digest if it cannot be calculated.
       */
      private volatile IOException ioe = null;

      /**
       * Calculated digest if there are no error.
       */
      private volatile MD5Hash digest = null;

      /**
       * FsImage file computed MD5.
       */
      private final File file;

      DigestThread(File inFile) {
        file = inFile;
        setName(inFile.getName() + " MD5 compute");
        setDaemon(true);
      }

      public MD5Hash getDigest() throws IOException {
        if (ioe != null) {
          throw ioe;
        }
        return digest;
      }

      public IOException getException() {
        return ioe;
      }

      @Override
      public void run() {
        try {
          digest = MD5FileUtils.computeMd5ForFile(file);
        } catch (IOException e) {
          ioe = e;
        } catch (Throwable t) {
          ioe = new IOException(t);
        }
      }

      @Override
      public String toString() {
        return "DigestThread{ ThreadName=" + getName() + ", digest=" + digest
            + ", file=" + file + '}';
      }
    }

    void load(File file) throws IOException {
      filename = file;
      long start = Time.monotonicNow();
      DigestThread dt = new DigestThread(file);
      dt.start();
      RandomAccessFile raFile = new RandomAccessFile(file, "r");
      FileInputStream fin = new FileInputStream(file);
      try {
        loadInternal(raFile, fin);
        try {
          dt.join();
          imgDigest = dt.getDigest();
        } catch (InterruptedException ie) {
          throw new IOException(ie);
        }
        long end = Time.monotonicNow();
        LOG.info("Loaded FSImage in {} seconds.", (end - start) / 1000);
      } finally {
        fin.close();
        raFile.close();
      }
    }

    /**
     * Given a FSImage FileSummary.section, return a LimitInput stream set to
     * the starting position of the section and limited to the section length.
     * @param section The FileSummary.Section containing the offset and length
     * @param compressionCodec The compression codec in use, if any
     * @return An InputStream for the given section
     * @throws IOException
     */
    public InputStream getInputStreamForSection(FileSummary.Section section,
                                                String compressionCodec)
        throws IOException {
      FileInputStream fin = new FileInputStream(filename);
      try {

          FileChannel channel = fin.getChannel();
          channel.position(section.getOffset());
          InputStream in = new BufferedInputStream(new LimitInputStream(fin,
                  section.getLength()));

          in = FSImageUtil.wrapInputStreamForCompression(conf,
                  compressionCodec, in);
          return in;
      } catch (IOException e) {
          fin.close();
          throw e;
      }
    }

    /**
     * Takes an ArrayList of Section's and removes all Section's whose
     * name ends in _SUB, indicating they are sub-sections. The original
     * array list is modified and a new list of the removed Section's is
     * returned.
     * @param sections Array List containing all Sections and Sub Sections
     *                 in the image.
     * @return ArrayList of the sections removed, or an empty list if none are
     *         removed.
     */
    private ArrayList<FileSummary.Section> getAndRemoveSubSections(
        ArrayList<FileSummary.Section> sections) {
      ArrayList<FileSummary.Section> subSections = new ArrayList<>();
      Iterator<FileSummary.Section> iter = sections.iterator();
      while (iter.hasNext()) {
        FileSummary.Section s = iter.next();
        String name = s.getName();
        if (name.matches(".*_SUB$")) {
          subSections.add(s);
          iter.remove();
        }
      }
      return subSections;
    }

    /**
     * Given an ArrayList of Section's, return all Section's with the given
     * name, or an empty list if none are found.
     * @param sections ArrayList of the Section's to search though
     * @param name The name of the Sections to search for
     * @return ArrayList of the sections matching the given name
     */
    private ArrayList<FileSummary.Section> getSubSectionsOfName(
        ArrayList<FileSummary.Section> sections, SectionName name) {
      ArrayList<FileSummary.Section> subSec = new ArrayList<>();
      for (FileSummary.Section s : sections) {
        String n = s.getName();
        SectionName sectionName = SectionName.fromString(n);
        if (sectionName == name) {
          subSec.add(s);
        }
      }
      return subSec;
    }

    /**
     * Checks the number of threads configured for parallel loading and
     * return an ExecutorService with configured number of threads. If the
     * thread count is set to less than 1, it will be reset to the default
     * value
     * @return ExecutorServie with the correct number of threads
     */
    private ExecutorService getParallelExecutorService() {
      int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
          DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
      if (threads < 1) {
        LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " +
            "default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
            threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
        threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT;
      }
      ExecutorService executorService = Executors.newFixedThreadPool(
          threads);
      LOG.info("The fsimage will be loaded in parallel using {} threads",
          threads);
      return executorService;
    }

    private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
        throws IOException {
      if (!FSImageUtil.checkFileFormat(raFile)) {
        throw new IOException("Unrecognized file format");
      }
      FileSummary summary = FSImageUtil.loadSummary(raFile);
      if (requireSameLayoutVersion && summary.getLayoutVersion() !=
          HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
        throw new IOException("Image version " + summary.getLayoutVersion() +
            " is not equal to the software version " +
            HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
      }

      FileChannel channel = fin.getChannel();

      FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
          fsn, this);
      FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
          fsn, this);

      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
          .getSectionsList());
      Collections.sort(sections, new Comparator<FileSummary.Section>() {
        @Override
        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
          SectionName n1 = SectionName.fromString(s1.getName());
          SectionName n2 = SectionName.fromString(s2.getName());
          if (n1 == null) {
            return n2 == null ? 0 : -1;
          } else if (n2 == null) {
            return -1;
          } else {
            return n1.ordinal() - n2.ordinal();
          }
        }
      });

      StartupProgress prog = NameNode.getStartupProgress();
      /**
       * beginStep() and the endStep() calls do not match the boundary of the
       * sections. This is because that the current implementation only allows
       * a particular step to be started for once.
       */
      Step currentStep = null;
      boolean loadInParallel = enableParallelSaveAndLoad(conf);

      ExecutorService executorService = null;
      ArrayList<FileSummary.Section> subSections =
          getAndRemoveSubSections(sections);
      if (loadInParallel) {
        executorService = getParallelExecutorService();
      }

      for (FileSummary.Section s : sections) {
        channel.position(s.getOffset());
        InputStream in = new BufferedInputStream(new LimitInputStream(fin,
            s.getLength()));

        in = FSImageUtil.wrapInputStreamForCompression(conf,
            summary.getCodec(), in);

        String n = s.getName();
        SectionName sectionName = SectionName.fromString(n);
        if (sectionName == null) {
          throw new IOException("Unrecognized section " + n);
        }

        ArrayList<FileSummary.Section> stageSubSections;
        switch (sectionName) {
        case NS_INFO:
          loadNameSystemSection(in);
          break;
        case STRING_TABLE:
          loadStringTableSection(in);
          break;
        case INODE: {
          currentStep = new Step(StepType.INODES);
          prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
          stageSubSections = getSubSectionsOfName(
              subSections, SectionName.INODE_SUB);
          if (loadInParallel && (stageSubSections.size() > 0)) {
            inodeLoader.loadINodeSectionInParallel(executorService,
                stageSubSections, summary.getCodec(), prog, currentStep);
          } else {
            inodeLoader.loadINodeSection(in, prog, currentStep);
          }
        }
          break;
        case INODE_REFERENCE:
          snapshotLoader.loadINodeReferenceSection(in);
          break;
        case INODE_DIR:
          stageSubSections = getSubSectionsOfName(
              subSections, SectionName.INODE_DIR_SUB);
          if (loadInParallel && stageSubSections.size() > 0) {
            inodeLoader.loadINodeDirectorySectionInParallel(executorService,
                stageSubSections, summary.getCodec());
          } else {
            inodeLoader.loadINodeDirectorySection(in);
          }
          inodeLoader.waitBlocksMapAndNameCacheUpdateFinished();
          break;
        case FILES_UNDERCONSTRUCTION:
          inodeLoader.loadFilesUnderConstructionSection(in);
          break;
        case SNAPSHOT:
          snapshotLoader.loadSnapshotSection(in);
          break;
        case SNAPSHOT_DIFF:
          snapshotLoader.loadSnapshotDiffSection(in);
          break;
        case SECRET_MANAGER: {
          prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
          Step step = new Step(StepType.DELEGATION_TOKENS);
          prog.beginStep(Phase.LOADING_FSIMAGE, step);
          loadSecretManagerSection(in, prog, step);
          prog.endStep(Phase.LOADING_FSIMAGE, step);
        }
          break;
        case CACHE_MANAGER: {
          Step step = new Step(StepType.CACHE_POOLS);
          prog.beginStep(Phase.LOADING_FSIMAGE, step);
          loadCacheManagerSection(in, prog, step);
          prog.endStep(Phase.LOADING_FSIMAGE, step);
        }
          break;
        case ERASURE_CODING:
          Step step = new Step(StepType.ERASURE_CODING_POLICIES);
          prog.beginStep(Phase.LOADING_FSIMAGE, step);
          loadErasureCodingSection(in);
          prog.endStep(Phase.LOADING_FSIMAGE, step);
          break;
        default:
          LOG.warn("Unrecognized section {}", n);
          break;
        }
      }
      if (executorService != null) {
        executorService.shutdown();
      }
    }

    private void loadNameSystemSection(InputStream in) throws IOException {
      NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
      BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
      blockIdManager.setLegacyGenerationStamp(s.getGenstampV1());
      blockIdManager.setGenerationStamp(s.getGenstampV2());
      blockIdManager.setLegacyGenerationStampLimit(s.getGenstampV1Limit());
      blockIdManager.setLastAllocatedContiguousBlockId(s.getLastAllocatedBlockId());
      if (s.hasLastAllocatedStripedBlockId()) {
        blockIdManager.setLastAllocatedStripedBlockId(
            s.getLastAllocatedStripedBlockId());
      }
      imgTxId = s.getTransactionId();
      if (s.hasRollingUpgradeStartTime()
          && fsn.getFSImage().hasRollbackFSImage()) {
        // we set the rollingUpgradeInfo only when we make sure we have the
        // rollback image
        fsn.setRollingUpgradeInfo(true, s.getRollingUpgradeStartTime());
      }
    }

    private void loadStringTableSection(InputStream in) throws IOException {
      StringTableSection s = StringTableSection.parseDelimitedFrom(in);
      ctx.stringTable =
          SerialNumberManager.newStringTable(s.getNumEntry(), s.getMaskBits());
      for (int i = 0; i < s.getNumEntry(); ++i) {
        StringTableSection.Entry e = StringTableSection.Entry
            .parseDelimitedFrom(in);
        ctx.stringTable.put(e.getId(), e.getStr());
      }
    }

    private void loadSecretManagerSection(InputStream in, StartupProgress prog,
        Step currentStep) throws IOException {
      SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in);
      int numKeys = s.getNumKeys(), numTokens = s.getNumTokens();
      ArrayList<SecretManagerSection.DelegationKey> keys = Lists
          .newArrayListWithCapacity(numKeys);
      ArrayList<SecretManagerSection.PersistToken> tokens = Lists
          .newArrayListWithCapacity(numTokens);

      for (int i = 0; i < numKeys; ++i)
        keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in));

      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numTokens);
      Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
      for (int i = 0; i < numTokens; ++i) {
        tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in));
      }

      fsn.loadSecretManagerState(s, keys, tokens, counter);
    }

    private void loadCacheManagerSection(InputStream in, StartupProgress prog,
        Step currentStep) throws IOException {
      CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in);
      int numPools = s.getNumPools();
      ArrayList<CachePoolInfoProto> pools = Lists
          .newArrayListWithCapacity(numPools);
      ArrayList<CacheDirectiveInfoProto> directives = Lists
          .newArrayListWithCapacity(s.getNumDirectives());
      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numPools);
      Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
      for (int i = 0; i < numPools; ++i) {
        pools.add(CachePoolInfoProto.parseDelimitedFrom(in));
        counter.increment();
      }
      for (int i = 0; i < s.getNumDirectives(); ++i)
        directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in));
      fsn.getCacheManager().loadState(
          new CacheManager.PersistState(s, pools, directives));
    }

    private void loadErasureCodingSection(InputStream in)
        throws IOException {
      ErasureCodingSection s = ErasureCodingSection.parseDelimitedFrom(in);
      List<ErasureCodingPolicyInfo> ecPolicies = Lists
          .newArrayListWithCapacity(s.getPoliciesCount());
      for (int i = 0; i < s.getPoliciesCount(); ++i) {
        ecPolicies.add(PBHelperClient.convertErasureCodingPolicyInfo(
            s.getPolicies(i)));
      }
      fsn.getErasureCodingPolicyManager().loadPolicies(ecPolicies, conf);
    }
  }

  private static boolean enableParallelSaveAndLoad(Configuration conf) {
    boolean loadInParallel = enableParallelLoad;
    boolean compressionEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
        DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);

    if (loadInParallel) {
      if (compressionEnabled) {
        LOG.warn("Parallel Image loading and saving is not supported when {}" +
                " is set to true. Parallel will be disabled.",
            DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
        loadInParallel = false;
      }
    }
    return loadInParallel;
  }

  public static void initParallelLoad(Configuration conf) {
    enableParallelLoad =
        conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
            DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
  }

  public static void refreshParallelSaveAndLoad(boolean enable) {
    enableParallelLoad = enable;
  }

  public static boolean getEnableParallelLoad() {
    return enableParallelLoad;
  }

  public static final class Saver {
    public static final int CHECK_CANCEL_INTERVAL = 4096;
    private boolean writeSubSections = false;
    private int inodesPerSubSection = Integer.MAX_VALUE;

    private final SaveNamespaceContext context;
    private final SaverContext saverContext;
    private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
    private long subSectionOffset = currentOffset;
    private MD5Hash savedDigest;

    private FileChannel fileChannel;
    // OutputStream for the section data
    private OutputStream sectionOutputStream;
    private CompressionCodec codec;
    private OutputStream underlyingOutputStream;
    private Configuration conf;

    Saver(SaveNamespaceContext context, Configuration conf) {
      this.context = context;
      this.saverContext = new SaverContext();
      this.conf = conf;
    }

    public MD5Hash getSavedDigest() {
      return savedDigest;
    }

    public SaveNamespaceContext getContext() {
      return context;
    }

    public SaverContext getSaverContext() {
      return saverContext;
    }

    public int getInodesPerSubSection() {
      return inodesPerSubSection;
    }

    /**
     * Commit the length and offset of a fsimage section to the summary index,
     * including the sub section, which will be committed before the section is
     * committed.
     * @param summary The image summary object
     * @param name The name of the section to commit
     * @param subSectionName The name of the sub-section to commit
     * @throws IOException
     */
    public void commitSectionAndSubSection(FileSummary.Builder summary,
        SectionName name, SectionName subSectionName) throws IOException {
      commitSubSection(summary, subSectionName);
      commitSection(summary, name);
    }

    public void commitSection(FileSummary.Builder summary, SectionName name)
        throws IOException {
      long oldOffset = currentOffset;
      flushSectionOutputStream();

      if (codec != null) {
        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
      } else {
        sectionOutputStream = underlyingOutputStream;
      }
      long length = fileChannel.position() - oldOffset;
      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
          .setLength(length).setOffset(currentOffset));
      currentOffset += length;
      subSectionOffset = currentOffset;
    }

    /**
     * Commit the length and offset of a fsimage sub-section to the summary
     * index.
     * @param summary The image summary object
     * @param name The name of the sub-section to commit
     * @throws IOException
     */
    public void commitSubSection(FileSummary.Builder summary, SectionName name)
        throws IOException {
      if (!writeSubSections) {
        return;
      }

      LOG.debug("Saving a subsection for {}", name.toString());
      // The output stream must be flushed before the length is obtained
      // as the flush can move the length forward.
      sectionOutputStream.flush();
      long length = fileChannel.position() - subSectionOffset;
      if (length == 0) {
        LOG.warn("The requested section for {} is empty. It will not be " +
            "output to the image", name.toString());
        return;
      }
      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
          .setLength(length).setOffset(subSectionOffset));
      subSectionOffset += length;
    }

    private void flushSectionOutputStream() throws IOException {
      if (codec != null) {
        ((CompressionOutputStream) sectionOutputStream).finish();
      }
      sectionOutputStream.flush();
    }

    /**
     * @return number of non-fatal errors detected while writing the image.
     * @throws IOException on fatal error.
     */
    long save(File file, FSImageCompression compression) throws IOException {
      enableSubSectionsIfRequired();
      FileOutputStream fout = new FileOutputStream(file);
      fileChannel = fout.getChannel();
      try {
        LOG.info("Saving image file {} using {}", file, compression);
        long startTime = monotonicNow();
        long numErrors = saveInternal(
            fout, compression, file.getAbsolutePath());
        LOG.info("Image file {} of size {} bytes saved in {} seconds {}.", file,
            file.length(), (monotonicNow() - startTime) / 1000,
            (numErrors > 0 ? (" with" + numErrors + " errors") : ""));
        return numErrors;
      } finally {
        fout.close();
      }
    }

    private void enableSubSectionsIfRequired() {
      boolean parallelEnabled = enableParallelSaveAndLoad(conf);
      int inodeThreshold = conf.getInt(
          DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
          DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
      int targetSections = conf.getInt(
          DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
          DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);

      if (parallelEnabled) {
        if (targetSections <= 0) {
          LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
              " default of {}",
              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
              targetSections,
              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
          targetSections =
              DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT;
        }
        if (inodeThreshold <= 0) {
          LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
                  " default of {}",
              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
              inodeThreshold,
              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
          inodeThreshold =
              DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT;
        }
        int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize();
        // Only enable parallel sections if there are enough inodes
        if (inodeCount >= inodeThreshold) {
          writeSubSections = true;
          // Calculate the inodes per section rounded up to the nearest int
          inodesPerSubSection = (inodeCount + targetSections - 1) /
              targetSections;
        }
      } else {
        writeSubSections = false;
      }
    }

    private static void saveFileSummary(OutputStream out, FileSummary summary)
        throws IOException {
      summary.writeDelimitedTo(out);
      int length = getOndiskTrunkSize(summary);
      byte[] lengthBytes = new byte[4];
      ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
      out.write(lengthBytes);
    }

    private long saveInodes(FileSummary.Builder summary) throws IOException {
      FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
          summary);

      saver.serializeINodeSection(sectionOutputStream);
      saver.serializeINodeDirectorySection(sectionOutputStream);
      saver.serializeFilesUCSection(sectionOutputStream);

      return saver.getNumImageErrors();
    }

    /**
     * @return number of non-fatal errors detected while saving the image.
     * @throws IOException on fatal error.
     */
    private long saveSnapshots(FileSummary.Builder summary) throws IOException {
      FSImageFormatPBSnapshot.Saver snapshotSaver = new FSImageFormatPBSnapshot.Saver(
          this, summary, context, context.getSourceNamesystem());

      snapshotSaver.serializeSnapshotSection(sectionOutputStream);
      // Skip snapshot-related sections when there is no snapshot.
      if (context.getSourceNamesystem().getSnapshotManager()
          .getNumSnapshots() > 0) {
        snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
      }
      snapshotSaver.serializeINodeReferenceSection(sectionOutputStream);
      return snapshotSaver.getNumImageErrors();
    }

    /**
     * @return number of non-fatal errors detected while writing the FsImage.
     * @throws IOException on fatal error.
     */
    private long saveInternal(FileOutputStream fout,
        FSImageCompression compression, String filePath) throws IOException {
      StartupProgress prog = NameNode.getStartupProgress();
      MessageDigest digester = MD5Hash.getDigester();
      int layoutVersion =
          context.getSourceNamesystem().getEffectiveLayoutVersion();

      underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(
          fout), digester);
      underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);

      fileChannel = fout.getChannel();

      FileSummary.Builder b = FileSummary.newBuilder()
          .setOndiskVersion(FSImageUtil.FILE_VERSION)
          .setLayoutVersion(
              context.getSourceNamesystem().getEffectiveLayoutVersion());

      codec = compression.getImageCodec();
      if (codec != null) {
        b.setCodec(codec.getClass().getCanonicalName());
        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
      } else {
        sectionOutputStream = underlyingOutputStream;
      }

      saveNameSystemSection(b);
      // Check for cancellation right after serializing the name system section.
      // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
      // depends on this behavior.
      context.checkCancelled();

      Step step;

      // Erasure coding policies should be saved before inodes
      if (NameNodeLayoutVersion.supports(
          NameNodeLayoutVersion.Feature.ERASURE_CODING, layoutVersion)) {
        step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
        prog.beginStep(Phase.SAVING_CHECKPOINT, step);
        saveErasureCodingSection(b);
        prog.endStep(Phase.SAVING_CHECKPOINT, step);
      }

      step = new Step(StepType.INODES, filePath);
      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
      // Count number of non-fatal errors when saving inodes and snapshots.
      long numErrors = saveInodes(b);
      numErrors += saveSnapshots(b);
      prog.endStep(Phase.SAVING_CHECKPOINT, step);

      step = new Step(StepType.DELEGATION_TOKENS, filePath);
      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
      saveSecretManagerSection(b);
      prog.endStep(Phase.SAVING_CHECKPOINT, step);

      step = new Step(StepType.CACHE_POOLS, filePath);
      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
      saveCacheManagerSection(b);
      prog.endStep(Phase.SAVING_CHECKPOINT, step);

      saveStringTableSection(b);

      // We use the underlyingOutputStream to write the header. Therefore flush
      // the buffered stream (which is potentially compressed) first.
      flushSectionOutputStream();

      FileSummary summary = b.build();
      saveFileSummary(underlyingOutputStream, summary);
      underlyingOutputStream.close();
      savedDigest = new MD5Hash(digester.digest());
      return numErrors;
    }

    private void saveSecretManagerSection(FileSummary.Builder summary)
        throws IOException {
      final FSNamesystem fsn = context.getSourceNamesystem();
      DelegationTokenSecretManager.SecretManagerState state = fsn
          .saveSecretManagerState();
      state.section.writeDelimitedTo(sectionOutputStream);
      for (SecretManagerSection.DelegationKey k : state.keys)
        k.writeDelimitedTo(sectionOutputStream);

      for (SecretManagerSection.PersistToken t : state.tokens)
        t.writeDelimitedTo(sectionOutputStream);

      commitSection(summary, SectionName.SECRET_MANAGER);
    }

    private void saveCacheManagerSection(FileSummary.Builder summary)
        throws IOException {
      final FSNamesystem fsn = context.getSourceNamesystem();
      CacheManager.PersistState state = fsn.getCacheManager().saveState();
      state.section.writeDelimitedTo(sectionOutputStream);

      for (CachePoolInfoProto p : state.pools)
        p.writeDelimitedTo(sectionOutputStream);

      for (CacheDirectiveInfoProto p : state.directives)
        p.writeDelimitedTo(sectionOutputStream);

      commitSection(summary, SectionName.CACHE_MANAGER);
    }

    private void saveErasureCodingSection(
        FileSummary.Builder summary) throws IOException {
      final FSNamesystem fsn = context.getSourceNamesystem();
      ErasureCodingPolicyInfo[] ecPolicies =
          fsn.getErasureCodingPolicyManager().getPersistedPolicies();
      ArrayList<ErasureCodingPolicyProto> ecPolicyProtoes =
          new ArrayList<ErasureCodingPolicyProto>();
      for (ErasureCodingPolicyInfo p : ecPolicies) {
        ecPolicyProtoes.add(PBHelperClient.convertErasureCodingPolicy(p));
      }

      ErasureCodingSection section = ErasureCodingSection.newBuilder().
          addAllPolicies(ecPolicyProtoes).build();
      section.writeDelimitedTo(sectionOutputStream);
      commitSection(summary, SectionName.ERASURE_CODING);
    }

    private void saveNameSystemSection(FileSummary.Builder summary)
        throws IOException {
      final FSNamesystem fsn = context.getSourceNamesystem();
      OutputStream out = sectionOutputStream;
      BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
      NameSystemSection.Builder b = NameSystemSection.newBuilder()
          .setGenstampV1(blockIdManager.getLegacyGenerationStamp())
          .setGenstampV1Limit(blockIdManager.getLegacyGenerationStampLimit())
          .setGenstampV2(blockIdManager.getGenerationStamp())
          .setLastAllocatedBlockId(blockIdManager.getLastAllocatedContiguousBlockId())
          .setLastAllocatedStripedBlockId(blockIdManager.getLastAllocatedStripedBlockId())
          .setTransactionId(context.getTxId());

      // We use the non-locked version of getNamespaceInfo here since
      // the coordinating thread of saveNamespace already has read-locked
      // the namespace for us. If we attempt to take another readlock
      // from the actual saver thread, there's a potential of a
      // fairness-related deadlock. See the comments on HDFS-2223.
      b.setNamespaceId(fsn.unprotectedGetNamespaceInfo().getNamespaceID());
      if (fsn.isRollingUpgrade()) {
        b.setRollingUpgradeStartTime(fsn.getRollingUpgradeInfo().getStartTime());
      }
      NameSystemSection s = b.build();
      s.writeDelimitedTo(out);

      commitSection(summary, SectionName.NS_INFO);
    }

    private void saveStringTableSection(FileSummary.Builder summary)
        throws IOException {
      OutputStream out = sectionOutputStream;

      SerialNumberManager.StringTable stringTable =
          SerialNumberManager.getStringTable();
      StringTableSection.Builder b = StringTableSection.newBuilder()
          .setNumEntry(stringTable.size())
          .setMaskBits(stringTable.getMaskBits());
      b.build().writeDelimitedTo(out);
      for (Entry<Integer, String> e : stringTable) {
        StringTableSection.Entry.Builder eb = StringTableSection.Entry
            .newBuilder().setId(e.getKey()).setStr(e.getValue());
        eb.build().writeDelimitedTo(out);
      }
      commitSection(summary, SectionName.STRING_TABLE);
    }
  }

  /**
   * Supported section name. The order of the enum determines the order of
   * loading.
   */
  public enum SectionName {
    NS_INFO("NS_INFO"),
    STRING_TABLE("STRING_TABLE"),
    EXTENDED_ACL("EXTENDED_ACL"),
    ERASURE_CODING("ERASURE_CODING"),
    INODE("INODE"),
    INODE_SUB("INODE_SUB"),
    INODE_REFERENCE("INODE_REFERENCE"),
    INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"),
    SNAPSHOT("SNAPSHOT"),
    INODE_DIR("INODE_DIR"),
    INODE_DIR_SUB("INODE_DIR_SUB"),
    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
    SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
    SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"),
    SECRET_MANAGER("SECRET_MANAGER"),
    CACHE_MANAGER("CACHE_MANAGER");

    private static final SectionName[] values = SectionName.values();

    public static SectionName fromString(String name) {
      for (SectionName n : values) {
        if (n.name.equals(name))
          return n;
      }
      return null;
    }

    private final String name;

    private SectionName(String name) {
      this.name = name;
    }
  }

  private static int getOndiskTrunkSize(
      org.apache.hadoop.thirdparty.protobuf.GeneratedMessageV3 s) {
    return CodedOutputStream.computeUInt32SizeNoTag(s.getSerializedSize())
        + s.getSerializedSize();
  }

  private FSImageFormatProtobuf() {
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞