hadoop RMStateStore 源码

  • 2022-10-20
  • 浏览 (270)

haddop RMStateStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import java.io.ByteArrayInputStream;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import javax.crypto.SecretKey;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;

@Private
@Unstable
/**
 * Base class to implement storage of ResourceManager state.
 * Takes care of asynchronous notifications and interfacing with YARN objects.
 * Real store implementations need to derive from it and implement blocking
 * store and load methods to actually store and load the state.
 */
public abstract class RMStateStore extends AbstractService {

  // constants for RM App state and RMDTSecretManagerState.
  @VisibleForTesting
  public static final String RM_APP_ROOT = "RMAppRoot";
  protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
  protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
      "RMDelegationTokensRoot";
  protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
  protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
  protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
      "RMDTSequenceNumber_";
  protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
      "AMRMTokenSecretManagerRoot";
  protected static final String RESERVATION_SYSTEM_ROOT =
      "ReservationSystemRoot";
  protected static final String PROXY_CA_ROOT = "ProxyCARoot";
  protected static final String PROXY_CA_CERT_NODE = "caCert";
  protected static final String PROXY_CA_PRIVATE_KEY_NODE = "caPrivateKey";
  protected static final String VERSION_NODE = "RMVersionNode";
  protected static final String EPOCH_NODE = "EpochNode";
  protected long baseEpoch;
  private long epochRange;
  protected ResourceManager resourceManager;
  private final ReadLock readLock;
  private final WriteLock writeLock;

  public static final Logger LOG =
      LoggerFactory.getLogger(RMStateStore.class);

  /**
   * The enum defines state of RMStateStore.
   */
  public enum RMStateStoreState {
    ACTIVE,
    FENCED
  };

  private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.ACTIVE)
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP_ATTEMPT,
          new StoreAppAttemptTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.UPDATE_APP_ATTEMPT,
          new UpdateAppAttemptTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.REMOVE_APP_ATTEMPT,
          new RemoveAppAttemptTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_MASTERKEY,
          new StoreRMDTMasterKeyTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.REMOVE_MASTERKEY,
          new RemoveRMDTMasterKeyTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_DELEGATION_TOKEN,
          new StoreRMDTTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
          new RemoveRMDTTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
          new UpdateRMDTTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.UPDATE_AMRM_TOKEN,
          new StoreOrUpdateAMRMTokenTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_RESERVATION,
          new StoreReservationAllocationTransition())
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.REMOVE_RESERVATION,
          new RemoveReservationAllocationTransition())
      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
          RMStateStoreEventType.FENCED)
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_PROXY_CA_CERT,
          new StoreProxyCACertTransition())
      .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
          EnumSet.of(
          RMStateStoreEventType.STORE_APP,
          RMStateStoreEventType.UPDATE_APP,
          RMStateStoreEventType.REMOVE_APP,
          RMStateStoreEventType.STORE_APP_ATTEMPT,
          RMStateStoreEventType.UPDATE_APP_ATTEMPT,
          RMStateStoreEventType.FENCED,
          RMStateStoreEventType.STORE_MASTERKEY,
          RMStateStoreEventType.REMOVE_MASTERKEY,
          RMStateStoreEventType.STORE_DELEGATION_TOKEN,
          RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
          RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
          RMStateStoreEventType.UPDATE_AMRM_TOKEN,
          RMStateStoreEventType.STORE_RESERVATION,
          RMStateStoreEventType.REMOVE_RESERVATION,
          RMStateStoreEventType.STORE_PROXY_CA_CERT));

  private final StateMachine<RMStateStoreState,
                             RMStateStoreEventType,
                             RMStateStoreEvent> stateMachine;

  private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateStoreAppEvent) event).getAppState();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Storing info for app: " + appId);
      try {
        store.storeApplicationStateInternal(appId, appState);
        store.notifyApplication(
            new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        if (e instanceof StoreLimitException) {
          store.notifyApplication(
              new RMAppEvent(appId, RMAppEventType.APP_SAVE_FAILED,
                  e.getMessage()));
        } else {
          isFenced = store.notifyStoreOperationFailedInternal(e);
        }
      }
      return finalState(isFenced);
    };

  }

  private static class UpdateAppTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateUpdateAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateUpdateAppEvent) event).getAppState();
      SettableFuture<Object> result =
          ((RMStateUpdateAppEvent) event).getResult();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Updating info for app: " + appId);
      try {
        if (isAppStateFinal(appState)) {
          pruneAppState(appState);
        }
        store.updateApplicationStateInternal(appId, appState);
        if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
          store.notifyApplication(new RMAppEvent(appId,
              RMAppEventType.APP_UPDATE_SAVED));
        }

        if (result != null) {
          result.set(null);
        }

      } catch (Exception e) {
        String msg = "Error updating app: " + appId;
        LOG.error(msg, e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
        if (result != null) {
          result.setException(new YarnException(msg, e));
        }
      }
      return finalState(isFenced);
    }

    private boolean isAppStateFinal(ApplicationStateData appState) {
      RMAppState state = appState.getState();
      return state == RMAppState.FINISHED || state == RMAppState.FAILED ||
          state == RMAppState.KILLED;
    }

    private void pruneAppState(ApplicationStateData appState) {
      ApplicationSubmissionContext srcCtx =
          appState.getApplicationSubmissionContext();
      ApplicationSubmissionContextPBImpl context =
          new ApplicationSubmissionContextPBImpl();
      // most fields in the ApplicationSubmissionContext are not needed,
      // but the following few need to be present for recovery to succeed
      context.setApplicationId(srcCtx.getApplicationId());
      context.setResource(srcCtx.getResource());
      context.setQueue(srcCtx.getQueue());
      context.setAMContainerResourceRequests(
          srcCtx.getAMContainerResourceRequests());
      context.setApplicationName(srcCtx.getApplicationName());
      context.setPriority(srcCtx.getPriority());
      context.setApplicationTags(srcCtx.getApplicationTags());
      context.setApplicationType(srcCtx.getApplicationType());
      context.setUnmanagedAM(srcCtx.getUnmanagedAM());
      context.setNodeLabelExpression(srcCtx.getNodeLabelExpression());
      ContainerLaunchContextPBImpl amContainerSpec =
              new ContainerLaunchContextPBImpl();
      amContainerSpec.setApplicationACLs(
              srcCtx.getAMContainerSpec().getApplicationACLs());
      context.setAMContainerSpec(amContainerSpec);
      appState.setApplicationSubmissionContext(context);
    }
  }

  private static class RemoveAppTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRemoveAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateStoreRemoveAppEvent) event).getAppState();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Removing info for app: " + appId);
      try {
        store.removeApplicationStateInternal(appState);
      } catch (Exception e) {
        LOG.error("Error removing app: " + appId, e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    };
  }

  private static class StoreAppAttemptTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationAttemptStateData attemptState =
          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
      try {
        LOG.debug("Storing info for attempt: {}", attemptState.getAttemptId());
        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
            attemptState);
        store.notifyApplicationAttempt(new RMAppAttemptEvent
               (attemptState.getAttemptId(),
               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    };
  }

  private static class UpdateAppAttemptTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationAttemptStateData attemptState =
          ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
      try {
        LOG.debug("Updating info for attempt: {}",
            attemptState.getAttemptId());
        store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
            attemptState);
        store.notifyApplicationAttempt(new RMAppAttemptEvent
               (attemptState.getAttemptId(),
               RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
      } catch (Exception e) {
        LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    };
  }

  private static class StoreRMDTTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRMDTEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
      try {
        LOG.info("Storing RMDelegationToken and SequenceNumber");
        store.storeRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
            e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class RemoveRMDTTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRMDTEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
      try {
        LOG.info("Removing RMDelegationToken and SequenceNumber");
        store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier());
      } catch (Exception e) {
        LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
            e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class UpdateRMDTTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRMDTEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
      try {
        LOG.info("Updating RMDelegationToken and SequenceNumber");
        store.updateRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
            e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class StoreRMDTMasterKeyTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreRMDTMasterKeyEvent dtEvent =
          (RMStateStoreRMDTMasterKeyEvent) event;
      try {
        LOG.info("Storing RMDTMasterKey.");
        store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDTMasterKey.", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class RemoveRMDTMasterKeyTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreRMDTMasterKeyEvent dtEvent =
          (RMStateStoreRMDTMasterKeyEvent) event;
      try {
        LOG.info("Removing RMDTMasterKey.");
        store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
      } catch (Exception e) {
        LOG.error("Error While Removing RMDTMasterKey.", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class StoreOrUpdateAMRMTokenTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
      boolean isFenced = false;
      try {
        LOG.info("Updating AMRMToken");
        store.storeOrUpdateAMRMTokenSecretManagerState(
            amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
      } catch (Exception e) {
        LOG.error("Error storing info for AMRMTokenSecretManager", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class StoreReservationAllocationTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreStoreReservationEvent reservationEvent =
          (RMStateStoreStoreReservationEvent) event;
      try {
        LOG.info("Storing reservation allocation." + reservationEvent
                .getReservationIdName());
        store.storeReservationState(
            reservationEvent.getReservationAllocation(),
            reservationEvent.getPlanName(),
            reservationEvent.getReservationIdName());
      } catch (Exception e) {
        LOG.error("Error while storing reservation allocation.", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class RemoveReservationAllocationTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreStoreReservationEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreStoreReservationEvent reservationEvent =
          (RMStateStoreStoreReservationEvent) event;
      try {
        LOG.info("Removing reservation allocation." + reservationEvent
                .getReservationIdName());
        store.removeReservationState(
            reservationEvent.getPlanName(),
            reservationEvent.getReservationIdName());
      } catch (Exception e) {
        LOG.error("Error while removing reservation allocation.", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static class StoreProxyCACertTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreProxyCAEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      RMStateStoreProxyCAEvent caEvent = (RMStateStoreProxyCAEvent) event;
      try {
        LOG.info("Storing CA Certificate and Private Key");
        store.storeProxyCACertState(
            caEvent.getCaCert(), caEvent.getCaPrivateKey());
      } catch (Exception e) {
        LOG.error("Error While Storing CA Certificate and Private Key", e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  private static RMStateStoreState finalState(boolean isFenced) {
    return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
  }

  private static class RemoveAppAttemptTransition implements
      MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationAttemptId attemptId =
          ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId();
      ApplicationId appId = attemptId.getApplicationId();
      LOG.info("Removing attempt " + attemptId + " from app: " + appId);
      try {
        store.removeApplicationAttemptInternal(attemptId);
      } catch (Exception e) {
        LOG.error("Error removing attempt: " + attemptId, e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    }
  }

  public RMStateStore() {
    super(RMStateStore.class.getName());
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();
    stateMachine = stateMachineFactory.make(this);
  }

  public static class RMDTSecretManagerState {
    // DTIdentifier -> renewDate
    Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
        new HashMap<RMDelegationTokenIdentifier, Long>();

    Set<DelegationKey> masterKeyState =
        new HashSet<DelegationKey>();

    int dtSequenceNumber = 0;

    public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
      return delegationTokenState;
    }

    public Set<DelegationKey> getMasterKeyState() {
      return masterKeyState;
    }

    public int getDTSequenceNumber() {
      return dtSequenceNumber;
    }
  }

  public static class ProxyCAState {
    private X509Certificate caCert;
    private PrivateKey caPrivateKey;

    public X509Certificate getCaCert() {
      return caCert;
    }

    public PrivateKey getCaPrivateKey() {
      return caPrivateKey;
    }

    public void setCaCert(X509Certificate caCert) {
      this.caCert = caCert;
    }

    public void setCaPrivateKey(PrivateKey caPrivateKey) {
      this.caPrivateKey = caPrivateKey;
    }

    public void setCaCert(byte[] caCertData) throws CertificateException {
      ByteArrayInputStream bais = new ByteArrayInputStream(caCertData);
      caCert = (X509Certificate)
          CertificateFactory.getInstance("X.509").generateCertificate(bais);
    }

    public void setCaPrivateKey(byte[] caPrivateKeyData)
        throws NoSuchAlgorithmException, InvalidKeySpecException {
      caPrivateKey = KeyFactory.getInstance("RSA").generatePrivate(
          new PKCS8EncodedKeySpec(caPrivateKeyData));
    }
  }

  /**
   * State of the ResourceManager
   */
  public static class RMState {
    Map<ApplicationId, ApplicationStateData> appState =
        new TreeMap<ApplicationId, ApplicationStateData>();

    RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();

    AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;

    private Map<String, Map<ReservationId, ReservationAllocationStateProto>>
        reservationState = new TreeMap<>();

    ProxyCAState proxyCAState = new ProxyCAState();

    public Map<ApplicationId, ApplicationStateData> getApplicationState() {
      return appState;
    }

    public RMDTSecretManagerState getRMDTSecretManagerState() {
      return rmSecretManagerState;
    }

    public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
      return amrmTokenSecretManagerState;
    }

    public Map<String, Map<ReservationId, ReservationAllocationStateProto>>
        getReservationState() {
      return reservationState;
    }

    public ProxyCAState getProxyCAState() {
      return proxyCAState;
    }
  }
    
  private Dispatcher rmDispatcher;

  /**
   * Dispatcher used to send state operation completion events to 
   * ResourceManager services
   */
  public void setRMDispatcher(Dispatcher dispatcher) {
    this.rmDispatcher = dispatcher;
  }
  
  AsyncDispatcher dispatcher;
  @SuppressWarnings("rawtypes")
  @VisibleForTesting
  protected EventHandler rmStateStoreEventHandler;

  @Override
  protected void serviceInit(Configuration conf) throws Exception{
    // create async handler
    dispatcher = new AsyncDispatcher("RM StateStore dispatcher");
    dispatcher.init(conf);
    rmStateStoreEventHandler = new ForwardingEventHandler();
    dispatcher.register(RMStateStoreEventType.class, 
                        rmStateStoreEventHandler);
    dispatcher.setDrainEventsOnStop();
    // read the base epoch value from conf
    baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
        YarnConfiguration.DEFAULT_RM_EPOCH);
    epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
        YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
    initInternal(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    dispatcher.start();
    startInternal();
  }

  /**
   * Derived classes initialize themselves using this method.
   */
  protected abstract void initInternal(Configuration conf) throws Exception;

  /**
   * Derived classes start themselves using this method.
   * The base class is started and the event dispatcher is ready to use at
   * this point
   */
  protected abstract void startInternal() throws Exception;

  @Override
  protected void serviceStop() throws Exception {
    dispatcher.stop();
    closeInternal();
  }

  /**
   * Derived classes close themselves using this method.
   * The base class will be closed and the event dispatcher will be shutdown 
   * after this
   */
  protected abstract void closeInternal() throws Exception;

  /**
   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
   * 2) Any incompatible change of state-store is a major upgrade, and any
   *    compatible change of state-store is a minor upgrade.
   * 3) If theres's no version, treat it as CURRENT_VERSION_INFO.
   * 4) Within a minor upgrade, say 1.1 to 1.2:
   *    overwrite the version info and proceed as normal.
   * 5) Within a major upgrade, say 1.2 to 2.0:
   *    throw exception and indicate user to use a separate upgrade tool to
   *    upgrade RM state.
   */
  public void checkVersion() throws Exception {
    Version loadedVersion = loadVersion();
    LOG.info("Loaded RM state version info " + loadedVersion);
    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
      return;
    }
    // if there is no version info, treat it as CURRENT_VERSION_INFO;
    if (loadedVersion == null) {
      loadedVersion = getCurrentVersion();
    }
    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
      LOG.info("Storing RM state version info " + getCurrentVersion());
      storeVersion();
    } else {
      throw new RMStateVersionIncompatibleException(
        "Expecting RM state version " + getCurrentVersion()
            + ", but loading version " + loadedVersion);
    }
  }

  /**
   * Derived class use this method to load the version information from state
   * store.
   */
  protected abstract Version loadVersion() throws Exception;

  /**
   * Derived class use this method to store the version information.
   */
  protected abstract void storeVersion() throws Exception;

  /**
   * Get the current version of the underlying state store.
   */
  protected abstract Version getCurrentVersion();


  /**
   * Get the current epoch of RM and increment the value.
   */
  public abstract long getAndIncrementEpoch() throws Exception;

  /**
   * Compute the next epoch value by incrementing by one.
   * Wraps around if the epoch range is exceeded so that
   * when federation is enabled epoch collisions can be avoided.
   */
  protected long nextEpoch(long epoch){
    long epochVal = epoch - baseEpoch + 1;
    if (epochRange > 0) {
      epochVal %= epochRange;
    }
    return  epochVal + baseEpoch;
  }

  /**
   * Blocking API
   * The derived class must recover state from the store and return a new 
   * RMState object populated with that state
   * This must not be called on the dispatcher thread
   */
  public abstract RMState loadState() throws Exception;
  
  /**
   * Non-Blocking API
   * ResourceManager services use this to store the application's state
   * This does not block the dispatcher threads
   * RMAppStoredEvent will be sent on completion to notify the RMApp
   */
  @SuppressWarnings("unchecked")
  public void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(app.getSubmitTime(),
            app.getStartTime(), context, app.getUser(), app.getRealUser(),
            app.getCallerContext());
    appState.setApplicationTimeouts(app.getApplicationTimeouts());
    getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

  @SuppressWarnings("unchecked")
  public void updateApplicationState(ApplicationStateData appState) {
    getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState));
  }

  @SuppressWarnings("unchecked")
  public void updateApplicationState(ApplicationStateData appState,
      boolean notifyApp) {
    getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState,
        notifyApp));
  }

  public void updateApplicationStateSynchronously(ApplicationStateData appState,
      boolean notifyApp, SettableFuture<Object> resultFuture) {
    handleStoreEvent(
        new RMStateUpdateAppEvent(appState, notifyApp, resultFuture));
  }

  public void updateFencedState() {
    handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the state of an 
   * application.
   */
  protected abstract void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateData) throws Exception;

  protected abstract void updateApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateData) throws Exception;
  
  @SuppressWarnings("unchecked")
  /**
   * Non-blocking API
   * ResourceManager services call this to store state on an application attempt
   * This does not block the dispatcher threads
   * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
   */
  public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
    RMAppAttemptMetrics attempMetrics = appAttempt.getRMAppAttemptMetrics();
    AggregateAppResourceUsage resUsage =
        attempMetrics.getAggregateAppResourceUsage();
    ApplicationAttemptStateData attemptState =
        ApplicationAttemptStateData.newInstance(
            appAttempt.getAppAttemptId(),
            appAttempt.getMasterContainer(),
            credentials, appAttempt.getStartTime(),
            resUsage.getResourceUsageSecondsMap(),
            attempMetrics.getPreemptedResourceSecondsMap(),
            attempMetrics.getTotalAllocatedContainers());

    getRMStateStoreEventHandler().handle(
      new RMStateStoreAppAttemptEvent(attemptState));
  }

  @SuppressWarnings("unchecked")
  public void updateApplicationAttemptState(
      ApplicationAttemptStateData attemptState) {
    getRMStateStoreEventHandler().handle(
      new RMStateUpdateAppAttemptEvent(attemptState));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the state of an 
   * application attempt
   */
  protected abstract void storeApplicationAttemptStateInternal(
      ApplicationAttemptId attemptId,
      ApplicationAttemptStateData attemptStateData) throws Exception;

  protected abstract void updateApplicationAttemptStateInternal(
      ApplicationAttemptId attemptId,
      ApplicationAttemptStateData attemptStateData) throws Exception;

  /**
   * RMDTSecretManager call this to store the state of a delegation token
   * and sequence number
   */
  public void storeRMDelegationToken(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
    handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
        RMStateStoreEventType.STORE_DELEGATION_TOKEN));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the state of
   * RMDelegationToken and sequence number
   */
  protected abstract void storeRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
      throws Exception;

  /**
   * RMDTSecretManager call this to remove the state of a delegation token
   */
  public void removeRMDelegationToken(
      RMDelegationTokenIdentifier rmDTIdentifier) {
    handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
        RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to remove the state of RMDelegationToken
   */
  protected abstract void removeRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;

  /**
   * RMDTSecretManager call this to update the state of a delegation token
   * and sequence number
   */
  public void updateRMDelegationToken(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
    handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
        RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to update the state of
   * RMDelegationToken and sequence number
   */
  protected abstract void updateRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
      throws Exception;

  /**
   * RMDTSecretManager call this to store the state of a master key
   */
  public void storeRMDTMasterKey(DelegationKey delegationKey) {
    handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
        RMStateStoreEventType.STORE_MASTERKEY));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the state of
   * DelegationToken Master Key
   */
  protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
      throws Exception;

  /**
   * RMDTSecretManager call this to remove the state of a master key
   */
  public void removeRMDTMasterKey(DelegationKey delegationKey) {
    handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
        RMStateStoreEventType.REMOVE_MASTERKEY));
  }

  /**
   * Blocking Apis to maintain reservation state.
   */
  public void storeNewReservation(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName) {
    handleStoreEvent(new RMStateStoreStoreReservationEvent(
        reservationAllocation, RMStateStoreEventType.STORE_RESERVATION,
        planName, reservationIdName));
  }

  public void removeReservation(String planName, String reservationIdName) {
    handleStoreEvent(new RMStateStoreStoreReservationEvent(
            null, RMStateStoreEventType.REMOVE_RESERVATION,
            planName, reservationIdName));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the state of
   * a reservation allocation.
   */
  protected abstract void storeReservationState(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName) throws Exception;

  /**
   * Blocking API
   * Derived classes must implement this method to remove the state of
   * a reservation allocation.
   */
  protected abstract void removeReservationState(String planName,
      String reservationIdName) throws Exception;

  /**
   * Blocking API
   * Derived classes must implement this method to remove the state of
   * DelegationToken Master Key
   */
  protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
      throws Exception;

  /**
   * Blocking API Derived classes must implement this method to store or update
   * the state of AMRMToken Master Key
   */
  protected abstract void storeOrUpdateAMRMTokenSecretManagerState(
      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
      throws Exception;

  /**
   * Store or Update state of AMRMToken Master Key
   */
  public void storeOrUpdateAMRMTokenSecretManager(
      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {
    handleStoreEvent(new RMStateStoreAMRMTokenEvent(
        amrmTokenSecretManagerState, isUpdate,
        RMStateStoreEventType.UPDATE_AMRM_TOKEN));
  }

  /**
   * Non-blocking API
   * ResourceManager services call this to remove an application from the state
   * store
   * This does not block the dispatcher threads
   * There is no notification of completion for this operation.
   */
  @SuppressWarnings("unchecked")
  public void removeApplication(RMApp app) {
    ApplicationStateData appState =
        ApplicationStateData.newInstance(app.getSubmitTime(),
            app.getStartTime(), app.getApplicationSubmissionContext(),
            app.getUser(), app.getRealUser(), app.getCallerContext());
    for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
      appState.attempts.put(appAttempt.getAppAttemptId(), null);
    }
    
    getRMStateStoreEventHandler().handle(
        new RMStateStoreRemoveAppEvent(appState));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to remove the state of an 
   * application and its attempts
   */
  protected abstract void removeApplicationStateInternal(
      ApplicationStateData appState) throws Exception;

  /**
   * Non-blocking API
   * ResourceManager services call this to remove an attempt from the state
   * store
   * This does not block the dispatcher threads
   * There is no notification of completion for this operation.
   */
  @SuppressWarnings("unchecked")
  public synchronized void removeApplicationAttempt(
      ApplicationAttemptId applicationAttemptId) {
    getRMStateStoreEventHandler().handle(
        new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to remove the state of specified
   * attempt.
   */
  protected abstract void removeApplicationAttemptInternal(
      ApplicationAttemptId attemptId) throws Exception;


  // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
  // YARN-1779
  public static final Text AM_RM_TOKEN_SERVICE = new Text(
    "AM_RM_TOKEN_SERVICE");

  public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME =
      new Text("YARN_CLIENT_TOKEN_MASTER_KEY");
  
  public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = new Credentials();

    SecretKey clientTokenMasterKey =
        appAttempt.getClientTokenMasterKey();
    if(clientTokenMasterKey != null){
      credentials.addSecretKey(AM_CLIENT_TOKEN_MASTER_KEY_NAME,
          clientTokenMasterKey.getEncoded());
    }
    return credentials;
  }
  
  @VisibleForTesting
  protected boolean isFencedState() {
    return (RMStateStoreState.FENCED == getRMStateStoreState());
  }

  // Dispatcher related code
  protected void handleStoreEvent(RMStateStoreEvent event) {
    this.writeLock.lock();
    try {

      LOG.debug("Processing event of type {}", event.getType());

      final RMStateStoreState oldState = getRMStateStoreState();

      this.stateMachine.doTransition(event.getType(), event);

      if (oldState != getRMStateStoreState()) {
        LOG.info("RMStateStore state change from " + oldState + " to "
            + getRMStateStoreState());
      }

    } catch (InvalidStateTransitionException e) {
      LOG.error("Can't handle this event at current state", e);
    } finally {
      this.writeLock.unlock();
    }
  }

  /**
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    if (isFencedState()) {
      return;
    }
    if (notifyStoreOperationFailedInternal(failureCause)) {
      updateFencedState();
    }
  }

  @SuppressWarnings("unchecked")
  private boolean notifyStoreOperationFailedInternal(
      Exception failureCause) {
    boolean isFenced = false;
    LOG.error("State store operation failed ", failureCause);

    if (HAUtil.isHAEnabled(getConfig())) {
      rmDispatcher.getEventHandler().handle(
          new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
              failureCause));
      isFenced = true;
    } else {
      rmDispatcher.getEventHandler().handle(
          new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
              failureCause));
    }

    return isFenced;
  }
 
  @SuppressWarnings("unchecked")
  /**
   * This method is called to notify the application that
   * new application is stored or updated in state store
   * @param event App event containing the app id and event type
   */
  private void notifyApplication(RMAppEvent event) {
    rmDispatcher.getEventHandler().handle(event);
  }
  
  @SuppressWarnings("unchecked")
  /**
   * This method is called to notify the application attempt
   * that new attempt is stored or updated in state store
   * @param event App attempt event containing the app attempt
   * id and event type
   */
  private void notifyApplicationAttempt(RMAppAttemptEvent event) {
    rmDispatcher.getEventHandler().handle(event);
  }
  
  /**
   * EventHandler implementation which forward events to the FSRMStateStore
   * This hides the EventHandle methods of the store from its public interface 
   */
  private final class ForwardingEventHandler 
                                  implements EventHandler<RMStateStoreEvent> {
    
    @Override
    public void handle(RMStateStoreEvent event) {
      handleStoreEvent(event);
    }
  }

  /**
   * Derived classes must implement this method to delete the state store
   * @throws Exception
   */
  public abstract void deleteStore() throws Exception;

  /**
   * Derived classes must implement this method to remove application from the
   * state store
   * 
   * @throws Exception
   */
  public abstract void removeApplication(ApplicationId removeAppId)
      throws Exception;

  public void setResourceManager(ResourceManager rm) {
    this.resourceManager = rm;
  }

  public RMStateStoreState getRMStateStoreState() {
    this.readLock.lock();
    try {
      return this.stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  @SuppressWarnings("rawtypes")
  protected EventHandler getRMStateStoreEventHandler() {
    return dispatcher.getEventHandler();
  }

  /**
   * ProxyCAManager calls this to store the CA Certificate and Private Key.
   */
  public void storeProxyCACert(X509Certificate caCert,
      PrivateKey caPrivateKey) {
    handleStoreEvent(new RMStateStoreProxyCAEvent(caCert, caPrivateKey,
        RMStateStoreEventType.STORE_PROXY_CA_CERT));
  }

  /**
   * Blocking API
   * Derived classes must implement this method to store the CA Certificate
   * and Private Key
   */
  protected abstract void storeProxyCACertState(
      X509Certificate caCert, PrivateKey caPrivateKey) throws Exception;
}

相关信息

hadoop 源码目录

相关文章

hadoop FileSystemRMStateStore 源码

hadoop LeveldbRMStateStore 源码

hadoop MemoryRMStateStore 源码

hadoop NullRMStateStore 源码

hadoop RMStateStoreAMRMTokenEvent 源码

hadoop RMStateStoreAppAttemptEvent 源码

hadoop RMStateStoreAppEvent 源码

hadoop RMStateStoreEvent 源码

hadoop RMStateStoreEventType 源码

hadoop RMStateStoreFactory 源码

0  赞