hadoop ActiveStandbyElectorBasedElectorService 源码

  • 2022-10-20
  • 浏览 (238)

haddop ActiveStandbyElectorBasedElectorService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

/**
 * Leader election implementation that uses {@link ActiveStandbyElector}.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ActiveStandbyElectorBasedElectorService extends AbstractService
    implements EmbeddedElector,
    ActiveStandbyElector.ActiveStandbyElectorCallback {
  private static final Logger LOG = LoggerFactory.
      getLogger(ActiveStandbyElectorBasedElectorService.class.getName());
  private static final HAServiceProtocol.StateChangeRequestInfo req =
      new HAServiceProtocol.StateChangeRequestInfo(
          HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);

  private ResourceManager rm;

  private byte[] localActiveNodeInfo;
  private ActiveStandbyElector elector;
  private long zkSessionTimeout;
  private Timer zkDisconnectTimer;
  @VisibleForTesting
  final Object zkDisconnectLock = new Object();

  ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
    super(ActiveStandbyElectorBasedElectorService.class.getName());
    this.rm = rm;
  }

  @Override
  protected void serviceInit(Configuration conf)
      throws Exception {
    conf = conf instanceof YarnConfiguration
        ? conf
        : new YarnConfiguration(conf);

    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
    if (zkQuorum == null) {
      throw new YarnRuntimeException("Embedded automatic failover " +
          "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
          " is not set");
    }

    String rmId = HAUtil.getRMHAId(conf);
    String clusterId = YarnConfiguration.getClusterId(conf);
    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
    String electionZNode = zkBasePath + "/" + clusterId;

    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

    List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
    List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);

    int maxRetryNum =
        conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
          .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
            CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);

    elector.ensureParentZNode();
    if (!isParentZnodeSafe(clusterId)) {
      notifyFatalError(String.format("invalid data in znode, %s, " +
          "which may require the state store to be reformatted",
          electionZNode));
    }

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    elector.joinElection(localActiveNodeInfo);
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    /**
     * When error occurs in serviceInit(), serviceStop() can be called.
     * We need null check for the case.
     */
    if (elector != null) {
      elector.quitElection(false);
      elector.terminateConnection();
    }
    super.serviceStop();
  }

  @Override
  public void becomeActive() throws ServiceFailedException {
    cancelDisconnectTimer();

    try {
      rm.getRMContext().getRMAdminService().transitionToActive(req);
    } catch (Exception e) {
      throw new ServiceFailedException("RM could not transition to Active", e);
    }
  }

  @Override
  public void becomeStandby() {
    cancelDisconnectTimer();

    try {
      rm.getRMContext().getRMAdminService().transitionToStandby(req);
    } catch (Exception e) {
      LOG.error("RM could not transition to Standby", e);
    }
  }

  /**
   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
   */
  private void cancelDisconnectTimer() {
    synchronized (zkDisconnectLock) {
      if (zkDisconnectTimer != null) {
        zkDisconnectTimer.cancel();
        zkDisconnectTimer = null;
      }
    }
  }

  /**
   * When the ZK client loses contact with ZK, this method will be called to
   * allow the RM to react. Because the loss of connection can be noticed
   * before the session timeout happens, it is undesirable to transition
   * immediately. Instead the method starts a timer that will wait
   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
   * initiating the transition into standby state.
   */
  @Override
  public void enterNeutralMode() {
    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
        + zkSessionTimeout + " ms if connection is not reestablished.");

    // If we've just become disconnected, start a timer.  When the time's up,
    // we'll transition to standby.
    synchronized (zkDisconnectLock) {
      if (zkDisconnectTimer == null) {
        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
        zkDisconnectTimer.schedule(new TimerTask() {
          @Override
          public void run() {
            synchronized (zkDisconnectLock) {
              // Only run if the timer hasn't been cancelled
              if (zkDisconnectTimer != null) {
                becomeStandby();
              }
            }
          }
        }, zkSessionTimeout);
      }
    }
  }

  @SuppressWarnings(value = "unchecked")
  @Override
  public void notifyFatalError(String errorMessage) {
    rm.getRMContext().getDispatcher().getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
            errorMessage));
  }

  @Override
  public void fenceOldActive(byte[] oldActiveData) {
    LOG.debug("Request to fence old active being ignored, " +
        "as embedded leader election doesn't support fencing");
  }

  private static byte[] createActiveNodeInfo(String clusterId, String rmId)
      throws IOException {
    return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
        .newBuilder()
        .setClusterId(clusterId)
        .setRmId(rmId)
        .build()
        .toByteArray();
  }

  private boolean isParentZnodeSafe(String clusterId)
      throws InterruptedException, IOException, KeeperException {
    byte[] data;
    try {
      data = elector.getActiveData();
    } catch (ActiveStandbyElector.ActiveNotFoundException e) {
      // no active found, parent znode is safe
      return true;
    }

    YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
    try {
      proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
          .parseFrom(data);
    } catch (InvalidProtocolBufferException e) {
      LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
      return false;
    }

    // Check if the passed proto corresponds to an RM in the same cluster
    if (!proto.getClusterId().equals(clusterId)) {
      LOG.error("Mismatched cluster! The other RM seems " +
          "to be from a different cluster. Current cluster = " + clusterId +
          "Other RM's cluster = " + proto.getClusterId());
      return false;
    }
    return true;
  }

  // EmbeddedElector methods

  @Override
  public void rejoinElection() {
    elector.quitElection(false);
    elector.joinElection(localActiveNodeInfo);
  }

  @Override
  public String getZookeeperConnectionState() {
    return elector.getHAZookeeperConnectionState();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMSProcessingChain 源码

hadoop AdminService 源码

hadoop ApplicationMasterService 源码

hadoop ClientRMService 源码

hadoop ClusterMetrics 源码

hadoop ClusterMonitor 源码

hadoop CuratorBasedElectorService 源码

hadoop DBManager 源码

hadoop DecommissioningNodesWatcher 源码

hadoop DefaultAMSProcessor 源码

0  赞