hadoop SecondaryNameNode 源码

  • 2022-10-20
  • 浏览 (102)

haddop SecondaryNameNode 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.util.ExitUtil.terminate;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.*;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.VersionInfo;

import javax.management.ObjectName;

/**********************************************************
 * The Secondary NameNode is a helper to the primary NameNode.
 * The Secondary is responsible for supporting periodic checkpoints 
 * of the HDFS metadata. The current design allows only one Secondary
 * NameNode per HDFs cluster.
 *
 * The Secondary NameNode is a daemon that periodically wakes
 * up (determined by the schedule specified in the configuration),
 * triggers a periodic checkpoint and then goes back to sleep.
 * The Secondary NameNode uses the NamenodeProtocol to talk to the
 * primary NameNode.
 *
 **********************************************************/
@InterfaceAudience.Private
public class SecondaryNameNode implements Runnable,
        SecondaryNameNodeInfoMXBean {
    
  static{
    HdfsConfiguration.init();
  }
  public static final Logger LOG =
      LoggerFactory.getLogger(SecondaryNameNode.class.getName());

  private final long starttime = Time.now();
  private volatile long lastCheckpointTime = 0;
  private volatile long lastCheckpointWallclockTime = 0;

  private URL fsName;
  private CheckpointStorage checkpointImage;

  private NamenodeProtocol namenode;
  private Configuration conf;
  private InetSocketAddress nameNodeAddr;
  private volatile boolean shouldRun;
  private HttpServer2 infoServer;

  private Collection<URI> checkpointDirs;
  private List<URI> checkpointEditsDirs;

  private CheckpointConf checkpointConf;
  private FSNamesystem namesystem;

  private Thread checkpointThread;
  private ObjectName nameNodeStatusBeanName;
  private String legacyOivImageDir;

  @Override
  public String toString() {
    return getClass().getSimpleName() + " Status" 
      + "\nName Node Address      : " + nameNodeAddr
      + "\nStart Time             : " + new Date(starttime)
      + "\nLast Checkpoint        : " + (lastCheckpointTime == 0? "--":
        new Date(lastCheckpointWallclockTime))
      + " (" + ((Time.monotonicNow() - lastCheckpointTime) / 1000)
      + " seconds ago)"
      + "\nCheckpoint Period      : " + checkpointConf.getPeriod() + " seconds"
      + "\nCheckpoint Transactions: " + checkpointConf.getTxnCount()
      + "\nCheckpoint Dirs        : " + checkpointDirs
      + "\nCheckpoint Edits Dirs  : " + checkpointEditsDirs;
  }

  @VisibleForTesting
  FSImage getFSImage() {
    return checkpointImage;
  }

  @VisibleForTesting
  int getMergeErrorCount() {
    return checkpointImage.getMergeErrorCount();
  }

  @VisibleForTesting
  public FSNamesystem getFSNamesystem() {
    return namesystem;
  }
  
  @VisibleForTesting
  void setFSImage(CheckpointStorage image) {
    this.checkpointImage = image;
  }
  
  @VisibleForTesting
  NamenodeProtocol getNameNode() {
    return namenode;
  }
  
  @VisibleForTesting
  void setNameNode(NamenodeProtocol namenode) {
    this.namenode = namenode;
  }

  /**
   * Create a connection to the primary namenode.
   */
  public SecondaryNameNode(Configuration conf)  throws IOException {
    this(conf, new CommandLineOpts());
  }
  
  public SecondaryNameNode(Configuration conf,
      CommandLineOpts commandLineOpts) throws IOException {
    try {
      String nsId = DFSUtil.getSecondaryNameServiceId(conf);
      if (HAUtil.isHAEnabled(conf, nsId)) {
        throw new IOException(
            "Cannot use SecondaryNameNode in an HA cluster." +
            " The Standby Namenode will perform checkpointing.");
      }
      NameNode.initializeGenericKeys(conf, nsId, null);
      initialize(conf, commandLineOpts);
    } catch (IOException e) {
      shutdown();
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      shutdown();
      throw e;
    }
  }
  
  public static InetSocketAddress getHttpAddress(Configuration conf) {
    return NetUtils.createSocketAddr(conf.getTrimmed(
        DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
        DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT));
  }
  
  /**
   * Initialize SecondaryNameNode.
   */
  private void initialize(final Configuration conf,
      CommandLineOpts commandLineOpts) throws IOException {
    final InetSocketAddress infoSocAddr = getHttpAddress(conf);
    final String infoBindAddress = infoSocAddr.getHostName();
    UserGroupInformation.setConfiguration(conf);
    if (UserGroupInformation.isSecurityEnabled()) {
      SecurityUtil.login(conf,
          DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
          DFSConfigKeys.DFS_SECONDARY_NAMENODE_KERBEROS_PRINCIPAL_KEY, infoBindAddress);
    }
    // initiate Java VM metrics
    DefaultMetricsSystem.initialize("SecondaryNameNode");
    JvmMetrics.create("SecondaryNameNode",
        conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
        DefaultMetricsSystem.instance());

    // Create connection to the namenode.
    shouldRun = true;
    nameNodeAddr = NameNode.getServiceAddress(conf, true);

    this.conf = conf;
    this.namenode = NameNodeProxies.createNonHAProxy(conf, nameNodeAddr, 
        NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
        true).getProxy();

    // initialize checkpoint directories
    fsName = getInfoServer();
    checkpointDirs = FSImage.getCheckpointDirs(conf,
                                  "/tmp/hadoop/dfs/namesecondary");
    checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, 
                                  "/tmp/hadoop/dfs/namesecondary");    
    checkpointImage = new CheckpointStorage(conf, checkpointDirs, checkpointEditsDirs);
    checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
    checkpointImage.deleteTempEdits();
    
    namesystem = new FSNamesystem(conf, checkpointImage, true);

    // Disable quota checks
    namesystem.dir.disableQuotaChecks();

    // Initialize other scheduling parameters from the configuration
    checkpointConf = new CheckpointConf(conf);
    nameNodeStatusBeanName = MBeans.register("SecondaryNameNode",
            "SecondaryNameNodeInfo", this);

    legacyOivImageDir = conf.get(
        DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);

    LOG.info("Checkpoint Period   :" + checkpointConf.getPeriod() + " secs "
        + "(" + checkpointConf.getPeriod() / 60 + " min)");
    LOG.info("Log Size Trigger    :" + checkpointConf.getTxnCount() + " txns");
  }

  /**
   * Wait for the service to finish.
   * (Normally, it runs forever.)
   */
  private void join() {
    try {
      infoServer.join();
    } catch (InterruptedException ie) {
      LOG.debug("Exception ", ie);
    }
  }

  /**
   * Shut down this instance of the secondary namenode.
   * Returns only after shutdown is complete.
   */
  public void shutdown() {
    shouldRun = false;
    if (checkpointThread != null) {
      checkpointThread.interrupt();
      try {
        checkpointThread.join(10000);
      } catch (InterruptedException e) {
        LOG.info("Interrupted waiting to join on checkpointer thread");
        Thread.currentThread().interrupt(); // maintain status
      }
    }
    try {
      if (infoServer != null) {
        infoServer.stop();
        infoServer = null;
      }
    } catch (Exception e) {
      LOG.warn("Exception shutting down SecondaryNameNode", e);
    }
    if (nameNodeStatusBeanName != null) {
      MBeans.unregister(nameNodeStatusBeanName);
      nameNodeStatusBeanName = null;
    }
    try {
      if (checkpointImage != null) {
        checkpointImage.close();
        checkpointImage = null;
      }
    } catch(IOException e) {
      LOG.warn("Exception while closing CheckpointStorage", e);
    }
    if (namesystem != null) {
      namesystem.shutdown();
      namesystem = null;
    }
  }

  @Override
  public void run() {
    SecurityUtil.doAsLoginUserOrFatal(
        new PrivilegedAction<Object>() {
        @Override
        public Object run() {
          doWork();
          return null;
        }
      });
  }
  //
  // The main work loop
  //
  public void doWork() {
    //
    // Poll the Namenode (once every checkpointCheckPeriod seconds) to find the
    // number of transactions in the edit log that haven't yet been checkpointed.
    //
    long period = checkpointConf.getCheckPeriod();
    int maxRetries = checkpointConf.getMaxRetriesOnMergeError();

    while (shouldRun) {
      try {
        Thread.sleep(1000 * period);
      } catch (InterruptedException ie) {
        // do nothing
      }
      if (!shouldRun) {
        break;
      }
      try {
        // We may have lost our ticket since last checkpoint, log in again, just in case
        if(UserGroupInformation.isSecurityEnabled())
          UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
        
        final long monotonicNow = Time.monotonicNow();
        final long now = Time.now();

        if (shouldCheckpointBasedOnCount() ||
            monotonicNow >= lastCheckpointTime + 1000 * checkpointConf.getPeriod()) {
          doCheckpoint();
          lastCheckpointTime = monotonicNow;
          lastCheckpointWallclockTime = now;
        }
      } catch (IOException e) {
        LOG.error("Exception in doCheckpoint", e);
        e.printStackTrace();
        // Prevent a huge number of edits from being created due to
        // unrecoverable conditions and endless retries.
        if (checkpointImage.getMergeErrorCount() > maxRetries) {
          LOG.error("Merging failed " +
              checkpointImage.getMergeErrorCount() + " times.");
          terminate(1);
        }
      } catch (Throwable e) {
        LOG.error("Throwable Exception in doCheckpoint", e);
        e.printStackTrace();
        terminate(1, e);
      }
    }
  }

  /**
   * Download <code>fsimage</code> and <code>edits</code>
   * files from the name-node.
   * @return true if a new image has been downloaded and needs to be loaded
   * @throws IOException
   */
  static boolean downloadCheckpointFiles(
      final URL nnHostPort,
      final FSImage dstImage,
      final CheckpointSignature sig,
      final RemoteEditLogManifest manifest
  ) throws IOException {
    
    // Sanity check manifest - these could happen if, eg, someone on the
    // NN side accidentally rmed the storage directories
    if (manifest.getLogs().isEmpty()) {
      throw new IOException("Found no edit logs to download on NN since txid " 
          + sig.mostRecentCheckpointTxId);
    }
    
    long expectedTxId = sig.mostRecentCheckpointTxId + 1;
    if (manifest.getLogs().get(0).getStartTxId() != expectedTxId) {
      throw new IOException("Bad edit log manifest (expected txid = " +
          expectedTxId + ": " + manifest);
    }

    try {
        Boolean b = UserGroupInformation.getCurrentUser().doAs(
            new PrivilegedExceptionAction<Boolean>() {
  
          @Override
          public Boolean run() throws Exception {
            dstImage.getStorage().cTime = sig.cTime;

            // get fsimage
            if (sig.mostRecentCheckpointTxId ==
                dstImage.getStorage().getMostRecentCheckpointTxId()) {
              LOG.info("Image has not changed. Will not download image.");
            } else {
              LOG.info("Image has changed. Downloading updated image from NN.");
              MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
                  nnHostPort, sig.mostRecentCheckpointTxId,
                  dstImage.getStorage(), true, false);
              dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
                  sig.mostRecentCheckpointTxId, downloadedHash);
            }
        
            // get edits file
            for (RemoteEditLog log : manifest.getLogs()) {
              TransferFsImage.downloadEditsToStorage(
                  nnHostPort, log, dstImage.getStorage());
            }
        
            // true if we haven't loaded all the transactions represented by the
            // downloaded fsimage.
            return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId;
          }
        });
        return b.booleanValue();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
  }
  
  InetSocketAddress getNameNodeAddress() {
    return nameNodeAddr;
  }

  /**
   * Returns the Jetty server that the Namenode is listening on.
   */
  private URL getInfoServer() throws IOException {
    URI fsName = FileSystem.getDefaultUri(conf);
    if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
      throw new IOException("This is not a DFS");
    }

    final String scheme = DFSUtil.getHttpClientScheme(conf);
    URI address = DFSUtil.getInfoServerWithDefaultHost(fsName.getHost(), conf,
        scheme);
    LOG.debug("Will connect to NameNode at " + address);
    return address.toURL();
  }

  /**
   * Start the web server.
   */
  @VisibleForTesting
  public void startInfoServer() throws IOException {
    final InetSocketAddress httpAddr = getHttpAddress(conf);
    final String httpsAddrString = conf.getTrimmed(
        DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
        DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT);
    InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);

    HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
        httpAddr, httpsAddr, "secondary", DFSConfigKeys.
            DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
        DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);

    final boolean xFrameEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED,
        DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT);

    final String xFrameOptionValue = conf.getTrimmed(
        DFSConfigKeys.DFS_XFRAME_OPTION_VALUE,
        DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT);

    builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue);

    infoServer = builder.build();
    infoServer.setAttribute("secondary.name.node", this);
    infoServer.setAttribute("name.system.image", checkpointImage);
    infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
    infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
        ImageServlet.class, true);
    infoServer.start();

    LOG.info("Web server init done");

    HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
    int connIdx = 0;
    if (policy.isHttpEnabled()) {
      InetSocketAddress httpAddress =
          infoServer.getConnectorAddress(connIdx++);
      conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
          NetUtils.getHostPortString(httpAddress));
    }

    if (policy.isHttpsEnabled()) {
      InetSocketAddress httpsAddress =
          infoServer.getConnectorAddress(connIdx);
      conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
          NetUtils.getHostPortString(httpsAddress));
    }
  }

  /**
   * Create a new checkpoint
   * @return if the image is fetched from primary or not
   */
  @VisibleForTesting
  @SuppressWarnings("deprecated")
  public boolean doCheckpoint() throws IOException {
    checkpointImage.ensureCurrentDirExists();
    NNStorage dstStorage = checkpointImage.getStorage();
    
    // Tell the namenode to start logging transactions in a new edit file
    // Returns a token that would be used to upload the merged image.
    CheckpointSignature sig = namenode.rollEditLog();
    
    boolean loadImage = false;
    boolean isFreshCheckpointer = (checkpointImage.getNamespaceID() == 0);
    boolean isSameCluster =
        (dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
            && sig.isSameCluster(checkpointImage)) ||
        (!dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
            && sig.namespaceIdMatches(checkpointImage));
    if (isFreshCheckpointer ||
        (isSameCluster &&
         !sig.storageVersionMatches(checkpointImage.getStorage()))) {
      // if we're a fresh 2NN, or if we're on the same cluster and our storage
      // needs an upgrade, just take the storage info from the server.
      dstStorage.setStorageInfo(sig);
      dstStorage.setClusterID(sig.getClusterID());
      dstStorage.setBlockPoolID(sig.getBlockpoolID());
      loadImage = true;
    }
    sig.validateStorageInfo(checkpointImage);

    // error simulation code for junit test
    CheckpointFaultInjector.getInstance().afterSecondaryCallsRollEditLog();

    RemoteEditLogManifest manifest =
      namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);

    // Fetch fsimage and edits. Reload the image if previous merge failed.
    loadImage |= downloadCheckpointFiles(
        fsName, checkpointImage, sig, manifest) |
        checkpointImage.hasMergeError();
    try {
      doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
    } catch (IOException ioe) {
      // A merge error occurred. The in-memory file system state may be
      // inconsistent, so the image and edits need to be reloaded.
      checkpointImage.setMergeError();
      throw ioe;
    }
    // Clear any error since merge was successful.
    checkpointImage.clearMergeError();

    
    //
    // Upload the new image into the NameNode. Then tell the Namenode
    // to make this new uploaded image as the most current image.
    //
    long txid = checkpointImage.getLastAppliedTxId();
    TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
        NameNodeFile.IMAGE, txid);

    // error simulation code for junit test
    CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

    LOG.warn("Checkpoint done. New Image Size: " 
             + dstStorage.getFsImageName(txid).length());

    if (legacyOivImageDir != null && !legacyOivImageDir.isEmpty()) {
      try {
        checkpointImage.saveLegacyOIVImage(namesystem, legacyOivImageDir,
            new Canceler());
      } catch (IOException e) {
        LOG.warn("Failed to write legacy OIV image: ", e);
      }
    }
    return loadImage;
  }

  /**
   * @param opts The parameters passed to this program.
   * @exception Exception if the filesystem does not exist.
   * @return 0 on success, non zero on error.
   */
  private int processStartupCommand(CommandLineOpts opts) throws Exception {
    if (opts.getCommand() == null) {
      return 0;
    }
    
    String cmd = StringUtils.toLowerCase(opts.getCommand().toString());
    
    int exitCode = 0;
    try {
      switch (opts.getCommand()) {
      case CHECKPOINT:
        long count = countUncheckpointedTxns();
        if (count > checkpointConf.getTxnCount() ||
            opts.shouldForceCheckpoint()) {
          doCheckpoint();
        } else {
          System.err.println("EditLog size " + count + " transactions is " +
                             "smaller than configured checkpoint " +
                             "interval " + checkpointConf.getTxnCount() + " transactions.");
          System.err.println("Skipping checkpoint.");
        }
        break;
      case GETEDITSIZE:
        long uncheckpointed = countUncheckpointedTxns();
        System.out.println("NameNode has " + uncheckpointed +
            " uncheckpointed transactions");
        break;
      default:
        throw new AssertionError("bad command enum: " + opts.getCommand());
      }
      
    } catch (RemoteException e) {
      //
      // This is a error returned by hadoop server. Print
      // out the first line of the error mesage, ignore the stack trace.
      exitCode = 1;
      try {
        String[] content;
        content = e.getLocalizedMessage().split("\n");
        LOG.error(cmd + ": " + content[0]);
      } catch (Exception ex) {
        LOG.error(cmd + ": " + ex.getLocalizedMessage());
      }
    } catch (IOException e) {
      //
      // IO exception encountered locally.
      //
      exitCode = 1;
      LOG.error(cmd + ": " + e.getLocalizedMessage());
    } finally {
      // Does the RPC connection need to be closed?
    }
    return exitCode;
  }

  private long countUncheckpointedTxns() throws IOException {
    long curTxId = namenode.getTransactionID();
    long uncheckpointedTxns = curTxId -
      checkpointImage.getStorage().getMostRecentCheckpointTxId();
    assert uncheckpointedTxns >= 0;
    return uncheckpointedTxns;
  }

  boolean shouldCheckpointBasedOnCount() throws IOException {
    return countUncheckpointedTxns() >= checkpointConf.getTxnCount();
  }

  /**
   * main() has some simple utility methods.
   * @param argv Command line parameters.
   * @exception Exception if the filesystem does not exist.
   */
  public static void main(String[] argv) throws Exception {
    CommandLineOpts opts = SecondaryNameNode.parseArgs(argv);
    if (opts == null) {
      LOG.error("Failed to parse options");
      terminate(1);
    } else if (opts.shouldPrintHelp()) {
      opts.usage();
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
      Configuration tconf = new HdfsConfiguration();
      SecondaryNameNode secondary = null;
      secondary = new SecondaryNameNode(tconf, opts);

      // SecondaryNameNode can be started in 2 modes:
      // 1. run a command (i.e. checkpoint or geteditsize) then terminate
      // 2. run as a daemon when {@link #parseArgs} yields no commands
      if (opts != null && opts.getCommand() != null) {
        // mode 1
        int ret = secondary.processStartupCommand(opts);
        terminate(ret);
      } else {
        // mode 2
        secondary.startInfoServer();
        secondary.startCheckpointThread();
        secondary.join();
      }
    } catch (Throwable e) {
      LOG.error("Failed to start secondary namenode", e);
      terminate(1);
    }
  }

  public void startCheckpointThread() {
    Preconditions.checkState(checkpointThread == null,
        "Should not already have a thread");
    Preconditions.checkState(shouldRun, "shouldRun should be true");
    
    checkpointThread = new Daemon(this);
    checkpointThread.start();
  }

  @Override // SecondaryNameNodeInfoMXBean
  public String getHostAndPort() {
    return NetUtils.getHostPortString(nameNodeAddr);
  }

  @Override
  public boolean isSecurityEnabled() {
    return UserGroupInformation.isSecurityEnabled();
  }

  @Override // SecondaryNameNodeInfoMXBean
  public long getStartTime() {
    return starttime;
  }

  @Override // SecondaryNameNodeInfoMXBean
  public long getLastCheckpointTime() {
    return lastCheckpointWallclockTime;
  }

  @Override // SecondaryNameNodeInfoMXBean
  public long getLastCheckpointDeltaMs() {
    if (lastCheckpointTime == 0) {
      return -1;
    } else {
      return (Time.monotonicNow() - lastCheckpointTime);
    }
  }

  @Override // SecondaryNameNodeInfoMXBean
  public String[] getCheckpointDirectories() {
    ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointDirs.size());
    for (URI d : checkpointDirs) {
      r.add(d.toString());
    }
    return r.toArray(new String[r.size()]);
  }

  @Override // SecondaryNameNodeInfoMXBean
  public String[] getCheckpointEditlogDirectories() {
    ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointEditsDirs.size());
    for (URI d : checkpointEditsDirs) {
      r.add(d.toString());
    }
    return r.toArray(new String[r.size()]);
  }

  @Override // VersionInfoMXBean
  public String getCompileInfo() {
    return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
            " from " + VersionInfo.getBranch();
  }

  @Override // VersionInfoMXBean
  public String getSoftwareVersion() {
    return VersionInfo.getVersion();
  }


  /**
   * Container for parsed command-line options.
   */
  @SuppressWarnings("static-access")
  static class CommandLineOpts {
    private final Options options = new Options();
    
    private final Option geteditsizeOpt;
    private final Option checkpointOpt;
    private final Option formatOpt;
    private final Option helpOpt;


    Command cmd;
    enum Command {
      GETEDITSIZE,
      CHECKPOINT;
    }
    
    private boolean shouldForce;
    private boolean shouldFormat;
    private boolean shouldPrintHelp;

    CommandLineOpts() {
      geteditsizeOpt = new Option("geteditsize",
        "return the number of uncheckpointed transactions on the NameNode");
      checkpointOpt = OptionBuilder.withArgName("force")
        .hasOptionalArg().withDescription("checkpoint on startup").create("checkpoint");
      formatOpt = new Option("format", "format the local storage during startup");
      helpOpt = new Option("h", "help", false, "get help information");
      
      options.addOption(geteditsizeOpt);
      options.addOption(checkpointOpt);
      options.addOption(formatOpt);
      options.addOption(helpOpt);
    }
    
    public boolean shouldFormat() {
      return shouldFormat;
    }

    public boolean shouldPrintHelp() {
      return shouldPrintHelp;
    }
    
    public void parse(String ... argv) throws ParseException {
      CommandLineParser parser = new PosixParser();
      CommandLine cmdLine = parser.parse(options, argv);
      
      if (cmdLine.hasOption(helpOpt.getOpt())
          || cmdLine.hasOption(helpOpt.getLongOpt())) {
        shouldPrintHelp = true;
        return;
      }
      
      boolean hasGetEdit = cmdLine.hasOption(geteditsizeOpt.getOpt());
      boolean hasCheckpoint = cmdLine.hasOption(checkpointOpt.getOpt()); 
      if (hasGetEdit && hasCheckpoint) {
        throw new ParseException("May not pass both "
            + geteditsizeOpt.getOpt() + " and "
            + checkpointOpt.getOpt());
      }
      
      if (hasGetEdit) {
        cmd = Command.GETEDITSIZE;
      } else if (hasCheckpoint) {
        cmd = Command.CHECKPOINT;
        
        String arg = cmdLine.getOptionValue(checkpointOpt.getOpt());
        if ("force".equals(arg)) {
          shouldForce = true;
        } else if (arg != null) {
          throw new ParseException("-checkpoint may only take 'force' as an "
              + "argument");
        }
      }
      
      if (cmdLine.hasOption(formatOpt.getOpt())) {
        shouldFormat = true;
      }
    }
    
    public Command getCommand() {
      return cmd;
    }
    
    public boolean shouldForceCheckpoint() {
      return shouldForce;
    }
    
    void usage() {
      String header = "The Secondary NameNode is a helper "
          + "to the primary NameNode. The Secondary is responsible "
          + "for supporting periodic checkpoints of the HDFS metadata. "
          + "The current design allows only one Secondary NameNode "
          + "per HDFS cluster.";
      HelpFormatter formatter = new HelpFormatter();
      formatter.printHelp("secondarynamenode", header, options, "", false);
    }
  }

  private static CommandLineOpts parseArgs(String[] argv) {
    CommandLineOpts opts = new CommandLineOpts();
    try {
      opts.parse(argv);
    } catch (ParseException pe) {
      LOG.error(pe.getMessage());
      opts.usage();
      return null;
    }
    return opts;
  }
  
  static class CheckpointStorage extends FSImage {
    
    private int mergeErrorCount;
    private static class CheckpointLogPurger implements LogsPurgeable {
      
      private final NNStorage storage;
      private final StoragePurger purger
          = new NNStorageRetentionManager.DeletionStoragePurger();
      
      public CheckpointLogPurger(NNStorage storage) {
        this.storage = storage;
      }

      @Override
      public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
        Iterator<StorageDirectory> iter = storage.dirIterator();
        while (iter.hasNext()) {
          StorageDirectory dir = iter.next();
          List<EditLogFile> editFiles = FileJournalManager.matchEditLogs(
              dir.getCurrentDir());
          for (EditLogFile f : editFiles) {
            if (f.getLastTxId() < minTxIdToKeep) {
              purger.purgeLog(f);
            }
          }
        }
      }

      @Override
      public void selectInputStreams(Collection<EditLogInputStream> streams,
          long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
        Iterator<StorageDirectory> iter = storage.dirIterator();
        while (iter.hasNext()) {
          StorageDirectory dir = iter.next();
          List<EditLogFile> editFiles;
          try {
            editFiles = FileJournalManager.matchEditLogs(
                dir.getCurrentDir());
          } catch (IOException ioe) {
            throw new RuntimeException(ioe);
          }
          FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
              fromTxId, Long.MAX_VALUE, inProgressOk);
        }
      }
      
    }
    
    /**
     * Construct a checkpoint image.
     * @param conf Node configuration.
     * @param imageDirs URIs of storage for image.
     * @param editsDirs URIs of storage for edit logs.
     * @throws IOException If storage cannot be access.
     */
    CheckpointStorage(Configuration conf, 
                      Collection<URI> imageDirs,
                      List<URI> editsDirs) throws IOException {
      super(conf, imageDirs, editsDirs);

      // the 2NN never writes edits -- it only downloads them. So
      // we shouldn't have any editLog instance. Setting to null
      // makes sure we don't accidentally depend on it.
      editLog = null;
      mergeErrorCount = 0;
      
      // Replace the archival manager with one that can actually work on the
      // 2NN's edits storage.
      this.archivalManager = new NNStorageRetentionManager(conf, storage,
          new CheckpointLogPurger(storage));
    }

    /**
     * Analyze checkpoint directories.
     * Create directories if they do not exist.
     * Recover from an unsuccessful checkpoint if necessary.
     *
     * @throws IOException
     */
    void recoverCreate(boolean format) throws IOException {
      storage.attemptRestoreRemovedStorage();
      storage.unlockAll();

      for (Iterator<StorageDirectory> it = 
                   storage.dirIterator(); it.hasNext();) {
        StorageDirectory sd = it.next();
        boolean isAccessible = true;
        try { // create directories if don't exist yet
          if(!sd.getRoot().mkdirs()) {
            // do nothing, directory is already created
          }
        } catch(SecurityException se) {
          isAccessible = false;
        }
        if(!isAccessible)
          throw new InconsistentFSStateException(sd.getRoot(),
              "cannot access checkpoint directory.");
        
        if (format) {
          // Don't confirm, since this is just the secondary namenode.
          LOG.info("Formatting storage directory " + sd);
          sd.clearDirectory();
        }
        
        StorageState curState;
        try {
          curState = sd.analyzeStorage(HdfsServerConstants.StartupOption.REGULAR, storage);
          // sd is locked but not opened
          switch(curState) {
          case NON_EXISTENT:
            // fail if any of the configured checkpoint dirs are inaccessible 
            throw new InconsistentFSStateException(sd.getRoot(),
                  "checkpoint directory does not exist or is not accessible.");
          case NOT_FORMATTED:
            break;  // it's ok since initially there is no current and VERSION
          case NORMAL:
            // Read the VERSION file. This verifies that:
            // (a) the VERSION file for each of the directories is the same,
            // and (b) when we connect to a NN, we can verify that the remote
            // node matches the same namespace that we ran on previously.
            storage.readProperties(sd);
            break;
          default:  // recovery is possible
            sd.doRecover(curState);
          }
        } catch (IOException ioe) {
          sd.unlock();
          throw ioe;
        }
      }
    }


    boolean hasMergeError() {
      return (mergeErrorCount > 0);
    }

    int getMergeErrorCount() {
      return mergeErrorCount;
    }

    void setMergeError() {
      mergeErrorCount++;
    }

    void clearMergeError() {
      mergeErrorCount = 0;
    }
 
    /**
     * Ensure that the current/ directory exists in all storage
     * directories
     */
    void ensureCurrentDirExists() throws IOException {
      for (Iterator<StorageDirectory> it
             = storage.dirIterator(); it.hasNext();) {
        StorageDirectory sd = it.next();
        File curDir = sd.getCurrentDir();
        if (!curDir.exists() && !curDir.mkdirs()) {
          throw new IOException("Could not create directory " + curDir);
        }
      }
    }

    void deleteTempEdits() throws IOException {
      FilenameFilter filter = new FilenameFilter() {
        @Override
        public boolean accept(File dir, String name) {
          return name.matches(NameNodeFile.EDITS_TMP.getName()
              + "_(\\d+)-(\\d+)_(\\d+)");
        }
      };
      Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
      for (;it.hasNext();) {
        StorageDirectory dir = it.next();
        File[] tempEdits = dir.getCurrentDir().listFiles(filter);
        if (tempEdits != null) {
          for (File t : tempEdits) {
            boolean success = t.delete();
            if (!success) {
              LOG.warn("Failed to delete temporary edits file: "
                  + t.getAbsolutePath());
            }
          }
        }
      }
    }

  }
    
  void doMerge(
      CheckpointSignature sig, RemoteEditLogManifest manifest,
      boolean loadImage, FSImage dstImage, FSNamesystem dstNamesystem)
      throws IOException {   
    NNStorage dstStorage = dstImage.getStorage();
    
    dstStorage.setStorageInfo(sig);
    if (loadImage) {
      File file = dstStorage.findImageFile(NameNodeFile.IMAGE,
          sig.mostRecentCheckpointTxId);
      if (file == null) {
        throw new IOException("Couldn't find image file at txid " + 
            sig.mostRecentCheckpointTxId + " even though it should have " +
            "just been downloaded");
      }
      dstNamesystem.writeLock();
      try {
        dstImage.reloadFromImageFile(file, dstNamesystem);
      } finally {
        dstNamesystem.writeUnlock("reloadFromImageFile");
      }
      dstNamesystem.imageLoadComplete();
    }
    // error simulation code for junit test
    CheckpointFaultInjector.getInstance().duringMerge();   

    Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
    // The following has the side effect of purging old fsimages/edit logs.
    dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
    if (!namenode.isRollingUpgrade()) {
      dstImage.updateStorageVersion();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞