hadoop ZKFailoverController 源码

  • 2022-10-20
  • 浏览 (275)

haddop ZKFailoverController 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.data.ACL;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController {

  static final Logger LOG = LoggerFactory.getLogger(ZKFailoverController.class);
  
  public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
  private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
  private static final int ZK_SESSION_TIMEOUT_DEFAULT = 10*1000;
  private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
  public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
  private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
  public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
  static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";

  /**
   * All of the conf keys used by the ZKFC. This is used in order to allow
   * them to be overridden on a per-nameservice or per-namenode basis.
   */
  protected static final String[] ZKFC_CONF_KEYS = new String[] {
    ZK_QUORUM_KEY,
    ZK_SESSION_TIMEOUT_KEY,
    ZK_PARENT_ZNODE_KEY,
    ZK_ACL_KEY,
    ZK_AUTH_KEY
  };
  
  protected static final String USAGE =
      "Usage: hdfs zkfc [ -formatZK [-force] [-nonInteractive] ]\n"
      + "\t-force: formats the znode if the znode exists.\n"
      + "\t-nonInteractive: formats the znode aborts if the znode exists,\n"
      + "\tunless -force option is specified.";

  /** Unable to format the parent znode in ZK */
  static final int ERR_CODE_FORMAT_DENIED = 2;
  /** The parent znode doesn't exist in ZK */
  static final int ERR_CODE_NO_PARENT_ZNODE = 3;
  /** Fencing is not properly configured */
  static final int ERR_CODE_NO_FENCER = 4;
  /** Automatic failover is not enabled */
  static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5;
  /** Cannot connect to ZooKeeper */
  static final int ERR_CODE_NO_ZK = 6;
  
  protected Configuration conf;
  private String zkQuorum;
  protected final HAServiceTarget localTarget;

  private HealthMonitor healthMonitor;
  private ActiveStandbyElector elector;
  protected ZKFCRpcServer rpcServer;

  private State lastHealthState = State.INITIALIZING;

  private volatile HAServiceState serviceState = HAServiceState.INITIALIZING;

  /** Set if a fatal error occurs */
  private String fatalError = null;

  /**
   * A future nanotime before which the ZKFC will not join the election.
   * This is used during graceful failover.
   */
  private long delayJoiningUntilNanotime = 0;

  /** Executor on which {@link #scheduleRecheck(long)} schedules events */
  private ScheduledExecutorService delayExecutor =
    Executors.newScheduledThreadPool(1,
        new ThreadFactoryBuilder().setDaemon(true)
            .setNameFormat("ZKFC Delay timer #%d")
            .build());

  private ActiveAttemptRecord lastActiveAttemptRecord;
  private Object activeAttemptRecordLock = new Object();

  protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) {
    this.localTarget = localTarget;
    this.conf = conf;
  }
  

  protected abstract byte[] targetToData(HAServiceTarget target);
  protected abstract HAServiceTarget dataToTarget(byte[] data);
  protected abstract void loginAsFCUser() throws IOException;
  protected abstract void checkRpcAdminAccess()
      throws AccessControlException, IOException;
  protected abstract InetSocketAddress getRpcAddressToBindTo();
  protected abstract PolicyProvider getPolicyProvider();
  protected abstract List<HAServiceTarget> getAllOtherNodes();

  /**
   * Return the name of a znode inside the configured parent znode in which
   * the ZKFC will do all of its work. This is so that multiple federated
   * nameservices can run on the same ZK quorum without having to manually
   * configure them to separate subdirectories.
   *
   * @return ScopeInsideParentNode.
   */
  protected abstract String getScopeInsideParentNode();

  public HAServiceTarget getLocalTarget() {
    return localTarget;
  }

  @VisibleForTesting
  public HAServiceState getServiceState() {
    return serviceState;
  }

  public int run(final String[] args) throws Exception {
    if (!localTarget.isAutoFailoverEnabled()) {
      LOG.error("Automatic failover is not enabled for " + localTarget + "." +
          " Please ensure that automatic failover is enabled in the " +
          "configuration before running the ZK failover controller.");
      return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
    }
    loginAsFCUser();
    try {
      return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
        @Override
        public Integer run() {
          try {
            return doRun(args);
          } catch (Exception t) {
            throw new RuntimeException(t);
          } finally {
            if (elector != null) {
              elector.terminateConnection();
            }
          }
        }
      });
    } catch (RuntimeException rte) {
      throw (Exception)rte.getCause();
    }
  }
  

  private int doRun(String[] args)
      throws Exception {
    try {
      initZK();
    } catch (KeeperException ke) {
      LOG.error("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.", ke);
      return ERR_CODE_NO_ZK;
    }
    try {
      if (args.length > 0) {
        if ("-formatZK".equals(args[0])) {
          boolean force = false;
          boolean interactive = true;
          for (int i = 1; i < args.length; i++) {
            if ("-force".equals(args[i])) {
              force = true;
            } else if ("-nonInteractive".equals(args[i])) {
              interactive = false;
            } else {
              badArg(args[i]);
            }
          }
          return formatZK(force, interactive);
        }
        else {
          badArg(args[0]);
        }
      }
    } catch (Exception e){
      LOG.error("The failover controller encounters runtime error", e);
      throw e;
    }

    if (!elector.parentZNodeExists()) {
      LOG.error("Unable to start failover controller. "
          + "Parent znode does not exist.\n"
          + "Run with -formatZK flag to initialize ZooKeeper.");
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.error("Fencing is not configured for " + localTarget + ".\n" +
          "You must configure a fencing method before using automatic " +
          "failover.", e);
      return ERR_CODE_NO_FENCER;
    }

    try {
      initRPC();
      initHM();
      startRPC();
      mainLoop();
    } catch (Exception e) {
      LOG.error("The failover controller encounters runtime error: ", e);
      throw e;
    } finally {
      rpcServer.stopAndJoin();
      
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }

  private void badArg(String arg) {
    printUsage();
    throw new HadoopIllegalArgumentException(
        "Bad argument: " + arg);
  }

  private void printUsage() {
    System.err.println(USAGE + "\n");
  }

  private int formatZK(boolean force, boolean interactive)
      throws IOException, InterruptedException, KeeperException {
    if (elector.parentZNodeExists()) {
      if (!force && (!interactive || !confirmFormat())) {
        return ERR_CODE_FORMAT_DENIED;
      }
      
      try {
        elector.clearParentZNode();
      } catch (IOException e) {
        LOG.error("Unable to clear zk parent znode", e);
        return 1;
      }
    }
    
    elector.ensureParentZNode();
    return 0;
  }

  private boolean confirmFormat() {
    String parentZnode = getParentZnode();
    System.err.println(
        "===============================================\n" +
        "The configured parent znode " + parentZnode + " already exists.\n" +
        "Are you sure you want to clear all failover information from\n" +
        "ZooKeeper?\n" +
        "WARNING: Before proceeding, ensure that all HDFS services and\n" +
        "failover controllers are stopped!\n" +
        "===============================================");
    try {
      return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?");
    } catch (IOException e) {
      LOG.debug("Failed to confirm", e);
      return false;
    }
  }

  // ------------------------------------------
  // Begin actual guts of failover controller
  // ------------------------------------------
  
  private void initHM() {
    healthMonitor = new HealthMonitor(conf, localTarget);
    healthMonitor.addCallback(new HealthCallbacks());
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    healthMonitor.start();
  }

  protected void initRPC() throws IOException {
    InetSocketAddress bindAddr = getRpcAddressToBindTo();
    LOG.info("ZKFC RpcServer binding to {}", bindAddr);
    rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
  }

  protected void startRPC() throws IOException {
    rpcServer.start();
  }


  private void initZK() throws HadoopIllegalArgumentException, IOException,
      KeeperException {
    zkQuorum = conf.get(ZK_QUORUM_KEY);
    int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
        ZK_SESSION_TIMEOUT_DEFAULT);
    // Parse ACLs from configuration.
    String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
    if (zkAcls.isEmpty()) {
      zkAcls = Ids.CREATOR_ALL_ACL;
    }
    
    // Parse authentication from configuration. Exclude any Credential providers
    // using the hdfs scheme to avoid a circular dependency. As HDFS is likely
    // not started when ZKFC is started, we cannot read the credentials from it.
    Configuration c = conf;
    try {
      c = ProviderUtils.excludeIncompatibleCredentialProviders(
          conf, FileSystem.getFileSystemClass("hdfs", conf));
    } catch (UnsupportedFileSystemException e) {
      // Should not happen in a real cluster, as the hdfs FS will always be
      // present. Inside tests, the hdfs filesystem will not be present
      LOG.debug("No filesystem found for the hdfs scheme", e);
    }
    List<ZKAuthInfo> zkAuths = SecurityUtil.getZKAuthInfos(c, ZK_AUTH_KEY);

    // Sanity check configuration.
    Preconditions.checkArgument(zkQuorum != null,
        "Missing required configuration '%s' for ZooKeeper quorum",
        ZK_QUORUM_KEY);
    Preconditions.checkArgument(zkTimeout > 0,
        "Invalid ZK session timeout %s", zkTimeout);
    
    int maxRetryNum = conf.getInt(
        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
    elector = new ActiveStandbyElector(zkQuorum,
        zkTimeout, getParentZnode(), zkAcls, zkAuths,
        new ElectorCallbacks(), maxRetryNum);
  }
  
  private String getParentZnode() {
    String znode = conf.get(ZK_PARENT_ZNODE_KEY,
        ZK_PARENT_ZNODE_DEFAULT);
    if (!znode.endsWith("/")) {
      znode += "/";
    }
    return znode + getScopeInsideParentNode();
  }

  private synchronized void mainLoop() throws InterruptedException {
    while (fatalError == null) {
      wait();
    }
    assert fatalError != null; // only get here on fatal
    throw new RuntimeException(
        "ZK Failover Controller failed: " + fatalError);
  }
  
  private synchronized void fatalError(String err) {
    LOG.error("Fatal error occurred:" + err);
    fatalError = err;
    notifyAll();
  }
  
  private synchronized void becomeActive() throws ServiceFailedException {
    LOG.info("Trying to make " + localTarget + " active...");
    try {
      HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
          conf, FailoverController.getRpcTimeoutToNewActive(conf)),
          createReqInfo());
      String msg = "Successfully transitioned " + localTarget +
          " to active state";
      LOG.info(msg);
      serviceState = HAServiceState.ACTIVE;
      recordActiveAttempt(new ActiveAttemptRecord(true, msg));

    } catch (Throwable t) {
      String msg = "Couldn't make " + localTarget + " active";
      LOG.error(msg, t);
      
      recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
          StringUtils.stringifyException(t)));

      if (t instanceof ServiceFailedException) {
        throw (ServiceFailedException)t;
      } else {
        throw new ServiceFailedException("Couldn't transition to active",
            t);
      }
/*
* TODO:
* we need to make sure that if we get fenced and then quickly restarted,
* none of these calls will retry across the restart boundary
* perhaps the solution is that, whenever the nn starts, it gets a unique
* ID, and when we start becoming active, we record it, and then any future
* calls use the same ID
*/
      
    }
  }

  /**
   * Store the results of the last attempt to become active.
   * This is used so that, during manually initiated failover,
   * we can report back the results of the attempt to become active
   * to the initiator of the failover.
   */
  private void recordActiveAttempt(
      ActiveAttemptRecord record) {
    synchronized (activeAttemptRecordLock) {
      lastActiveAttemptRecord = record;
      activeAttemptRecordLock.notifyAll();
    }
  }

  /**
   * Wait until one of the following events:
   * <ul>
   * <li>Another thread publishes the results of an attempt to become active
   * using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
   * <li>The node enters bad health status</li>
   * <li>The specified timeout elapses</li>
   * </ul>
   * 
   * @param timeoutMillis number of millis to wait
   * @param onlyAfterNanoTime accept attempt records only after a given
   * timestamp. Use this parameter to ignore the old attempt records from a
   * previous fail-over attempt.
   * @return the published record, or null if the timeout elapses or the
   * service becomes unhealthy 
   * @throws InterruptedException if the thread is interrupted.
   */
  private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis,
      long onlyAfterNanoTime) throws InterruptedException {
    long waitUntil = onlyAfterNanoTime + TimeUnit.NANOSECONDS.convert(
        timeoutMillis, TimeUnit.MILLISECONDS);
    
    do {
      // periodically check health state, because entering an
      // unhealthy state could prevent us from ever attempting to
      // become active. We can detect this and respond to the user
      // immediately.
      synchronized (this) {
        if (lastHealthState != State.SERVICE_HEALTHY) {
          // early out if service became unhealthy
          return null;
        }
      }

      synchronized (activeAttemptRecordLock) {
        if ((lastActiveAttemptRecord != null &&
            lastActiveAttemptRecord.nanoTime >= onlyAfterNanoTime)) {
          return lastActiveAttemptRecord;
        }
        // Only wait 1sec so that we periodically recheck the health state
        // above.
        activeAttemptRecordLock.wait(1000);
      }
    } while (System.nanoTime() < waitUntil);
    
    // Timeout elapsed.
    LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
        "to become active");
    return null;
  }

  private StateChangeRequestInfo createReqInfo() {
    return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
  }

  private synchronized void becomeStandby() {
    LOG.info("ZK Election indicated that " + localTarget +
        " should become standby");
    try {
      int timeout = FailoverController.getGracefulFenceTimeout(conf);
      localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
      LOG.info("Successfully transitioned " + localTarget +
          " to standby state");
    } catch (Exception e) {
      LOG.error("Couldn't transition " + localTarget + " to standby state",
          e);
      // TODO handle this. It's a likely case since we probably got fenced
      // at the same time.
    }
    serviceState = HAServiceState.STANDBY;
  }
  

  private synchronized void fenceOldActive(byte[] data) {
    HAServiceTarget target = dataToTarget(data);
    
    try {
      doFence(target);
    } catch (Throwable t) {
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
      throw t;
    }
  }
  
  private void doFence(HAServiceTarget target) {
    LOG.info("Should fence: " + target);
    boolean gracefulWorked = new FailoverController(conf,
        RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
    if (gracefulWorked) {
      // It's possible that it's in standby but just about to go into active,
      // no? Is there some race here?
      LOG.info("Successfully transitioned " + target + " to standby " +
          "state without fencing");
      return;
    }
    
    try {
      target.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.error("Couldn't fence old active " + target, e);
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
      throw new RuntimeException(e);
    }
    
    if (!target.getFencer().fence(target)) {
      throw new RuntimeException("Unable to fence " + target);
    }
  }


  /**
   * Request from graceful failover to cede active role. Causes
   * this ZKFC to transition its local node to standby, then quit
   * the election for the specified period of time, after which it
   * will rejoin iff it is healthy.
   */
  void cedeActive(final int millisToCede)
      throws AccessControlException, ServiceFailedException, IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          doCedeActive(millisToCede);
          return null;
        }
      });
    } catch (InterruptedException e) {
      throw new IOException(e);
    }
  }
  
  private void doCedeActive(int millisToCede) 
      throws AccessControlException, ServiceFailedException, IOException {
    int timeout = FailoverController.getGracefulFenceTimeout(conf);

    // Lock elector to maintain lock ordering of elector -> ZKFC
    synchronized (elector) {
      synchronized (this) {
        if (millisToCede <= 0) {
          delayJoiningUntilNanotime = 0;
          recheckElectability();
          return;
        }
  
        LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
            " at " + Server.getRemoteAddress() + " to cede active role.");
        boolean needFence = false;
        try {
          localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
          LOG.info("Successfully ensured local node is in standby mode");
        } catch (IOException ioe) {
          LOG.warn("Unable to transition local node to standby: " +
              ioe.getLocalizedMessage());
          LOG.warn("Quitting election but indicating that fencing is " +
              "necessary");
          needFence = true;
        }
        delayJoiningUntilNanotime = System.nanoTime() +
            TimeUnit.MILLISECONDS.toNanos(millisToCede);
        elector.quitElection(needFence);
        serviceState = HAServiceState.INITIALIZING;
      }
    }
    recheckElectability();
  }
  
  /**
   * Coordinate a graceful failover to this node.
   * @throws ServiceFailedException if the node fails to become active
   * @throws IOException some other error occurs
   */
  void gracefulFailoverToYou() throws ServiceFailedException, IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          doGracefulFailover();
          return null;
        }
        
      });
    } catch (InterruptedException e) {
      throw new IOException(e);
    }
  }

  /**
   * Coordinate a graceful failover. This proceeds in several phases:
   * 1) Pre-flight checks: ensure that the local node is healthy, and
   * thus a candidate for failover.
   * 2a) Determine the current active node. If it is the local node, no
   * need to failover - return success.
   * 2b) Get the other nodes
   * 3a) Ask the other nodes to yield from election for a number of seconds
   * 3b) Ask the active node to yield from the election for a number of seconds.
   * 4) Allow the normal election path to run in other threads. Wait until
   * we either become unhealthy or we see an election attempt recorded by
   * the normal code path.
   * 5) Allow the old active to rejoin the election, so a future
   * failback is possible.
   */
  private void doGracefulFailover()
      throws ServiceFailedException, IOException, InterruptedException {
    int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
    
    // Phase 1: pre-flight checks
    checkEligibleForFailover();
    
    // Phase 2: determine old/current active node. Check that we're not
    // ourselves active, etc.
    HAServiceTarget oldActive = getCurrentActive();
    if (oldActive == null) {
      // No node is currently active. So, if we aren't already
      // active ourselves by means of a normal election, then there's
      // probably something preventing us from becoming active.
      throw new ServiceFailedException(
          "No other node is currently active.");
    }
    
    if (oldActive.getAddress().equals(localTarget.getAddress())) {
      LOG.info("Local node " + localTarget + " is already active. " +
          "No need to failover. Returning success.");
      return;
    }

    // Phase 2b: get the other nodes
    List<HAServiceTarget> otherNodes = getAllOtherNodes();
    List<ZKFCProtocol> otherZkfcs = new ArrayList<ZKFCProtocol>(otherNodes.size());

    // Phase 3: ask the other nodes to yield from the election.
    long st = System.nanoTime();
    HAServiceTarget activeNode = null;
    for (HAServiceTarget remote : otherNodes) {
      // same location, same node - may not always be == equality
      if (remote.getAddress().equals(oldActive.getAddress())) {
        activeNode = remote;
        continue;
      }
      otherZkfcs.add(cedeRemoteActive(remote, timeout));
    }

    assert
      activeNode != null : "Active node does not match any known remote node";

    // Phase 3b: ask the old active to yield
    otherZkfcs.add(cedeRemoteActive(activeNode, timeout));

    // Phase 4: wait for the normal election to make the local node
    // active.
    ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000, st);
    
    if (attempt == null) {
      // We didn't even make an attempt to become active.
      synchronized(this) {
        if (lastHealthState != State.SERVICE_HEALTHY) {
          throw new ServiceFailedException("Unable to become active. " +
            "Service became unhealthy while trying to failover.");          
        }
      }
      
      throw new ServiceFailedException("Unable to become active. " +
          "Local node did not get an opportunity to do so from ZooKeeper, " +
          "or the local node took too long to transition to active.");
    }

    // Phase 5. At this point, we made some attempt to become active. So we
    // can tell the old active to rejoin if it wants. This allows a quick
    // fail-back if we immediately crash.
    for (ZKFCProtocol zkfc : otherZkfcs) {
      zkfc.cedeActive(-1);
    }

    if (attempt.succeeded) {
      LOG.info("Successfully became active. " + attempt.status);
    } else {
      // Propagate failure
      String msg = "Failed to become active. " + attempt.status;
      throw new ServiceFailedException(msg);
    }
  }

  /**
   * Ask the remote zkfc to cede its active status and wait for the specified
   * timeout before attempting to claim leader status.
   * @param remote node to ask
   * @param timeout amount of time to cede
   * @return the {@link ZKFCProtocol} used to talk to the ndoe
   * @throws IOException
   */
  private ZKFCProtocol cedeRemoteActive(HAServiceTarget remote, int timeout)
    throws IOException {
    LOG.info("Asking " + remote + " to cede its active state for "
               + timeout + "ms");
    ZKFCProtocol oldZkfc = remote.getZKFCProxy(conf, timeout);
    oldZkfc.cedeActive(timeout);
    return oldZkfc;
  }

  /**
   * If the local node is an observer or is unhealthy it
   * is not eligible for graceful failover.
   * @throws ServiceFailedException if the node is an observer or unhealthy
   */
  private synchronized void checkEligibleForFailover()
      throws ServiceFailedException {
    // Check health
    if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
      throw new ServiceFailedException(
          localTarget + " is not currently healthy. " +
          "Cannot be failover target");
    }
    if (serviceState == HAServiceState.OBSERVER) {
      throw new ServiceFailedException(
          localTarget + " is in observer state. " +
          "Cannot be failover target");
    }
  }

  /**
   * @return an {@link HAServiceTarget} for the current active node
   * in the cluster, or null if no node is active.
   * @throws IOException if a ZK-related issue occurs
   * @throws InterruptedException if thread is interrupted 
   */
  private HAServiceTarget getCurrentActive()
      throws IOException, InterruptedException {
    synchronized (elector) {
      synchronized (this) {
        byte[] activeData;
        try {
          activeData = elector.getActiveData();
        } catch (ActiveNotFoundException e) {
          return null;
        } catch (KeeperException ke) {
          throw new IOException(
              "Unexpected ZooKeeper issue fetching active node info", ke);
        }
        
        HAServiceTarget oldActive = dataToTarget(activeData);
        return oldActive;
      }
    }
  }

  /**
   * Check the current state of the service, and join the election
   * if it should be in the election.
   */
  private void recheckElectability() {
    // Maintain lock ordering of elector -> ZKFC
    synchronized (elector) {
      synchronized (this) {
        boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
    
        long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); 
        if (remainingDelay > 0) {
          if (healthy) {
            LOG.info("Would have joined master election, but this node is " +
                "prohibited from doing so for " +
                TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
          }
          scheduleRecheck(remainingDelay);
          return;
        }
    
        switch (lastHealthState) {
        case SERVICE_HEALTHY:
          if(serviceState != HAServiceState.OBSERVER) {
            elector.joinElection(targetToData(localTarget));
          }
          if (quitElectionOnBadState) {
            quitElectionOnBadState = false;
          }
          break;
          
        case INITIALIZING:
          LOG.info("Ensuring that " + localTarget + " does not " +
              "participate in active master election");
          elector.quitElection(false);
          serviceState = HAServiceState.INITIALIZING;
          break;
    
        case SERVICE_UNHEALTHY:
        case SERVICE_NOT_RESPONDING:
          LOG.info("Quitting master election for " + localTarget +
              " and marking that fencing is necessary");
          elector.quitElection(true);
          serviceState = HAServiceState.INITIALIZING;
          break;
          
        case HEALTH_MONITOR_FAILED:
          fatalError("Health monitor failed!");
          break;
          
        default:
          throw new IllegalArgumentException("Unhandled state:"
                                               + lastHealthState);
        }
      }
    }
  }
  
  /**
   * Schedule a call to {@link #recheckElectability()} in the future.
   */
  private void scheduleRecheck(long whenNanos) {
    delayExecutor.schedule(
        new Runnable() {
          @Override
          public void run() {
            try {
              recheckElectability();
            } catch (Throwable t) {
              fatalError("Failed to recheck electability: " +
                  StringUtils.stringifyException(t));
            }
          }
        },
        whenNanos, TimeUnit.NANOSECONDS);
  }

  int serviceStateMismatchCount = 0;
  boolean quitElectionOnBadState = false;

  void verifyChangedServiceState(HAServiceState changedState) {
    synchronized (elector) {
      synchronized (this) {
        if (serviceState == HAServiceState.INITIALIZING) {
          if (quitElectionOnBadState) {
            LOG.debug("rechecking for electability from bad state");
            recheckElectability();
          }
          return;
        }
        if (changedState == HAServiceState.OBSERVER) {
          elector.quitElection(true);
          serviceState = HAServiceState.OBSERVER;
          return;
        }
        if (changedState == serviceState) {
          serviceStateMismatchCount = 0;
          return;
        }
        if (serviceStateMismatchCount == 0) {
          // recheck one more time. As this might be due to parallel transition.
          serviceStateMismatchCount++;
          return;
        }
        // quit the election as the expected state and reported state
        // mismatches.
        LOG.error("Local service " + localTarget
            + " has changed the serviceState to " + changedState
            + ". Expected was " + serviceState
            + ". Quitting election marking fencing necessary.");
        delayJoiningUntilNanotime = System.nanoTime()
            + TimeUnit.MILLISECONDS.toNanos(1000);
        elector.quitElection(true);
        quitElectionOnBadState = true;
        serviceStateMismatchCount = 0;
        serviceState = HAServiceState.INITIALIZING;
      }
    }
  }

  /**
   * @return the last health state passed to the FC
   * by the HealthMonitor.
   */
  protected synchronized State getLastHealthState() {
    return lastHealthState;
  }

  protected synchronized void setLastHealthState(HealthMonitor.State newState) {
    LOG.info("Local service " + localTarget +
        " entered state: " + newState);
    lastHealthState = newState;
  }
  
  @VisibleForTesting
  public ActiveStandbyElector getElectorForTests() {
    return elector;
  }
  
  @VisibleForTesting
  ZKFCRpcServer getRpcServerForTests() {
    return rpcServer;
  }

  /**
   * Callbacks from elector
   */
  class ElectorCallbacks implements ActiveStandbyElectorCallback {
    @Override
    public void becomeActive() throws ServiceFailedException {
      ZKFailoverController.this.becomeActive();
    }

    @Override
    public void becomeStandby() {
      ZKFailoverController.this.becomeStandby();
    }

    @Override
    public void enterNeutralMode() {
    }

    @Override
    public void notifyFatalError(String errorMessage) {
      fatalError(errorMessage);
    }

    @Override
    public void fenceOldActive(byte[] data) {
      ZKFailoverController.this.fenceOldActive(data);
    }
    
    @Override
    public String toString() {
      synchronized (ZKFailoverController.this) {
        return "Elector callbacks for " + localTarget;
      }
    }
  }
  
  /**
   * Callbacks from HealthMonitor
   */
  class HealthCallbacks implements HealthMonitor.Callback {
    @Override
    public void enteredState(HealthMonitor.State newState) {
      setLastHealthState(newState);
      recheckElectability();
    }
  }

  /**
   * Callbacks for HAServiceStatus
   */
  class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback {
    @Override
    public void reportServiceStatus(HAServiceStatus status) {
      verifyChangedServiceState(status.getState());
    }
  }

  private static class ActiveAttemptRecord {
    private final boolean succeeded;
    private final String status;
    private final long nanoTime;
    
    public ActiveAttemptRecord(boolean succeeded, String status) {
      this.succeeded = succeeded;
      this.status = status;
      this.nanoTime = System.nanoTime();
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop ActiveStandbyElector 源码

hadoop BadFencingConfigurationException 源码

hadoop FailoverController 源码

hadoop FailoverFailedException 源码

hadoop FenceMethod 源码

hadoop HAAdmin 源码

hadoop HAServiceProtocol 源码

hadoop HAServiceProtocolHelper 源码

hadoop HAServiceStatus 源码

hadoop HAServiceTarget 源码

0  赞