hadoop RMAppImpl 源码

  • 2022-10-20
  • 浏览 (191)

haddop RMAppImpl 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.rmapp;

import java.net.InetAddress;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
    .ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.apache.hadoop.classification.VisibleForTesting;

@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {

  private static final Logger LOG =
      LoggerFactory.getLogger(RMAppImpl.class);
  private static final String UNAVAILABLE = "N/A";
  private static final String UNLIMITED = "UNLIMITED";
  private static final long UNKNOWN = -1L;
  private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
      EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
          RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
  private static final String STATE_CHANGE_MESSAGE =
      "%s State change from %s to %s on event = %s";
  private static final String RECOVERY_MESSAGE =
      "Recovering app: %s with %d attempts and final state = %s";

  // Immutable fields
  private final ApplicationId applicationId;
  private final RMContext rmContext;
  private final Configuration conf;
  private final String user;
  private final UserGroupInformation userUgi;
  private final String name;
  private final ApplicationSubmissionContext submissionContext;
  private final Dispatcher dispatcher;
  private final YarnScheduler scheduler;
  private final ApplicationMasterService masterService;
  private final StringBuilder diagnostics = new StringBuilder();
  private final int maxAppAttempts;
  private final ReadLock readLock;
  private final WriteLock writeLock;
  private final Map<ApplicationAttemptId, RMAppAttempt> attempts
      = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
  private final long submitTime;
  private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
  private final String applicationType;
  private final Set<String> applicationTags;
  private Map<String, String> applicationSchedulingEnvs = new HashMap<>();

  private final long attemptFailuresValidityInterval;
  private boolean amBlacklistingEnabled = false;
  private float blacklistDisableThreshold;

  private Clock systemClock;

  private boolean isNumAttemptsBeyondThreshold = false;



  // Mutable fields
  private long startTime;
  private long launchTime = 0;
  private long finishTime = 0;
  private long storedFinishTime = 0;
  private int firstAttemptIdInStateStore = 1;
  private int nextAttemptId = 1;
  private AppCollectorData collectorData;
  private CollectorInfo collectorInfo;
  // This field isn't protected by readlock now.
  private volatile RMAppAttempt currentAttempt;
  private String queue;
  private EventHandler handler;
  private static final AppFinishedTransition FINISHED_TRANSITION =
      new AppFinishedTransition();
  private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();

  private final RMAppLogAggregation logAggregation;
  private Map<ApplicationTimeoutType, Long> applicationTimeouts =
      new HashMap<ApplicationTimeoutType, Long>();

  // These states stored are only valid when app is at killing or final_saving.
  private RMAppState stateBeforeKilling;
  private RMAppState stateBeforeFinalSaving;
  private RMAppEvent eventCausingFinalSaving;
  private RMAppState targetedFinalState;
  private RMAppState recoveredFinalState;
  private List<ResourceRequest> amReqs;
  
  private CallerContext callerContext;

  private ApplicationPlacementContext placementContext;

  Object transitionTodo;

  private Priority applicationPriority;

  private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    // Transitions from NEW_SAVING state
    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
          new FinalSavingTransition(new AppRejectedTransition(),
            RMAppState.FAILED))
      .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
          RMAppEventType.APP_SAVE_FAILED, new AppRejectedTransition())

     // Transitions from SUBMITTED state
    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(
          new AppRejectedTransition(), RMAppState.FAILED))
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))

     // Transitions from ACCEPTED state
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
        RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
            YarnApplicationState.RUNNING))
    .addTransition(RMAppState.ACCEPTED,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
        // event because RMAppRecoveredTransition is returning ACCEPTED state
        // directly and waiting for the previous AM to exit.
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
      // Handle AppAttemptLaunch to update the launchTime and publish to ATS
      .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.ATTEMPT_LAUNCHED,
        new AttemptLaunchedTransition())

     // Transitions from RUNNING state
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.RUNNING,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())

     // Transitions from FINAL_SAVING state
    .addTransition(RMAppState.FINAL_SAVING,
      EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
        RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
        new FinalStateSavedTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new AttemptFinishedAtFinalSavingTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
          RMAppEventType.APP_NEW_SAVED))

     // Transitions from FINISHING state
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
      EnumSet.of(RMAppEventType.NODE_UPDATE,
        // ignore Kill/Move as we have already saved the final Finished state
        // in state store.
        RMAppEventType.KILL))

     // Transitions from KILLING state
    .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.KILLING,
        EnumSet.of(RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.KILLING))

    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_REGISTERED,
            RMAppEventType.APP_UPDATE_SAVED,
            RMAppEventType.KILL))

     // Transitions from FINISHED state
     // ignorable transitions
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_UNREGISTERED,
            RMAppEventType.ATTEMPT_FINISHED,
            RMAppEventType.KILL))

     // Transitions from FAILED state
     // ignorable transitions
    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))

     // Transitions from KILLED state
     // ignorable transitions
    .addTransition(RMAppState.KILLED, RMAppState.KILLED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.START))

     .installTopology();

  private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
                                                                 stateMachine;

  private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
  private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f;
  private static final float MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 1.0f;

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, String user, String queue,
      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
      ApplicationMasterService masterService, long submitTime,
      String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs) {
    this(applicationId, rmContext, config, name, user, queue, submissionContext,
      scheduler, masterService, submitTime, applicationType, applicationTags,
      amReqs, null, -1);
  }

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, String user, String queue,
      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
      ApplicationMasterService masterService, long submitTime,
      String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs, ApplicationPlacementContext
      placementContext, long startTime) {
    this(applicationId, rmContext, config, name,
        (user != null ? UserGroupInformation.createRemoteUser(user) : null),
        queue, submissionContext, scheduler, masterService, submitTime,
        applicationType, applicationTags, amReqs, placementContext, startTime);
  }

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, UserGroupInformation userUgi,
      String queue, ApplicationSubmissionContext submissionContext,
      YarnScheduler scheduler, ApplicationMasterService masterService,
      long submitTime, String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs, ApplicationPlacementContext
      placementContext, long startTime) {
    this.systemClock = SystemClock.getInstance();

    this.applicationId = applicationId;
    this.name = StringInterner.weakIntern(name);
    this.rmContext = rmContext;
    this.dispatcher = rmContext.getDispatcher();
    this.handler = dispatcher.getEventHandler();
    this.conf = config;
    this.user = StringInterner.weakIntern(
        (userUgi != null) ? userUgi.getShortUserName() : null);
    this.userUgi = userUgi;
    this.queue = queue;
    this.submissionContext = submissionContext;
    this.scheduler = scheduler;
    this.masterService = masterService;
    this.submitTime = submitTime;
    if (startTime <= 0) {
      this.startTime = this.systemClock.getTime();
    } else {
      this.startTime = startTime;
    }
    this.applicationType = StringInterner.weakIntern(applicationType);
    this.applicationTags = applicationTags;
    this.amReqs = amReqs;
    if (submissionContext.getPriority() != null) {
      this.applicationPriority = Priority
          .newInstance(submissionContext.getPriority().getPriority());
    } else {
      // If incoming app does not have priority configured in submission
      // context, system could be assume that its a 0 priority app and could be
      // considered as normal.
      this.applicationPriority = Priority.newInstance(0);
    }

    int globalMaxAppAttempts = conf.getInt(
        YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
        conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
    int rmMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
    int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
    if (individualMaxAppAttempts <= 0) {
      this.maxAppAttempts = rmMaxAppAttempts;
      LOG.warn("The specific max attempts: " + individualMaxAppAttempts
          + " for application: " + applicationId.getId()
          + " is invalid, because it is less than or equal to zero."
          + " Use the rm max attempts instead.");
    } else if (individualMaxAppAttempts > globalMaxAppAttempts) {
      this.maxAppAttempts = globalMaxAppAttempts;
      LOG.warn("The specific max attempts: " + individualMaxAppAttempts
          + " for application: " + applicationId.getId()
          + " is invalid, because it is out of the range [1, "
          + globalMaxAppAttempts + "]. Use the global max attempts instead.");
    } else {
      this.maxAppAttempts = individualMaxAppAttempts;
    }

    this.attemptFailuresValidityInterval =
        submissionContext.getAttemptFailuresValidityInterval();
    if (this.attemptFailuresValidityInterval > 0) {
      LOG.info("The attemptFailuresValidityInterval for the application: "
          + this.applicationId + " is " + this.attemptFailuresValidityInterval
          + ".");
    }

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();

    this.stateMachine = stateMachineFactory.make(this);

    this.callerContext = CallerContext.getCurrent();

    this.placementContext = placementContext;

    // If applications are not explicitly specifying envs, try to pull from
    // AM container environment lists.
    if(submissionContext.getAMContainerSpec() != null) {
      applicationSchedulingEnvs
          .putAll(submissionContext.getAMContainerSpec().getEnvironment());
    }
    applicationSchedulingEnvs
        .putAll(submissionContext.getApplicationSchedulingPropertiesMap());

    this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);

    // amBlacklistingEnabled can be configured globally
    // Just use the global values
    amBlacklistingEnabled =
        conf.getBoolean(
          YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
          YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
    if (amBlacklistingEnabled) {
      blacklistDisableThreshold = conf.getFloat(
          YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
          YarnConfiguration.
          DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
      // Verify whether blacklistDisableThreshold is valid. And for invalid
      // threshold, reset to global level blacklistDisableThreshold
      // configured.
      if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE ||
          blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) {
        blacklistDisableThreshold = YarnConfiguration.
            DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
      }
    }
  }

  /**
   * Starts the application level timeline collector for this app. This should
   * be used only if the timeline service v.2 is enabled.
   */
  public void startTimelineCollector() {
    AppLevelTimelineCollector collector =
        new AppLevelTimelineCollector(applicationId, user);
    rmContext.getRMTimelineCollectorManager().putIfAbsent(
        applicationId, collector);
  }

  /**
   * Stops the application level timeline collector for this app. This should be
   * used only if the timeline service v.2 is enabled.
   */
  public void stopTimelineCollector() {
    rmContext.getRMTimelineCollectorManager().remove(applicationId);
  }

  @Override
  public ApplicationId getApplicationId() {
    return this.applicationId;
  }
  
  @Override
  public ApplicationSubmissionContext getApplicationSubmissionContext() {
    return this.submissionContext;
  }

  @Override
  public FinalApplicationStatus getFinalApplicationStatus() {
    // finish state is obtained based on the state machine's current state
    // as a fall-back in case the application has not been unregistered
    // ( or if the app never unregistered itself )
    // when the report is requested
    if (currentAttempt != null
        && currentAttempt.getFinalApplicationStatus() != null) {
      return currentAttempt.getFinalApplicationStatus();
    }
    return createFinalApplicationStatus(this.stateMachine.getCurrentState());
  }

  @Override
  public RMAppState getState() {
    this.readLock.lock();
    try {
        return this.stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public String getUser() {
    return this.user;
  }

  @Override
  public float getProgress() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getProgress();
    }
    return 0;
  }

  @Override
  public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
    this.readLock.lock();

    try {
      return this.attempts.get(appAttemptId);
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public String getQueue() {
    return this.queue;
  }
  
  @Override
  public void setQueue(String queue) {
    this.queue = queue;
  }

  @Override
  public AppCollectorData getCollectorData() {
    return this.collectorData;
  }

  public void setCollectorData(AppCollectorData incomingData) {
    this.collectorData = incomingData;
    this.collectorInfo = CollectorInfo.newInstance(
        incomingData.getCollectorAddr(), incomingData.getCollectorToken());
  }

  public CollectorInfo getCollectorInfo() {
    return this.collectorInfo;
  }

  public void removeCollectorData() {
    this.collectorData = null;
  }

  @Override
  public String getName() {
    return this.name;
  }

  @Override
  public RMAppAttempt getCurrentAppAttempt() {
    return this.currentAttempt;
  }

  @Override
  public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() {
    this.readLock.lock();

    try {
      return Collections.unmodifiableMap(this.attempts);
    } finally {
      this.readLock.unlock();
    }
  }

  private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
    switch(state) {
    case NEW:
    case NEW_SAVING:
    case SUBMITTED:
    case ACCEPTED:
    case RUNNING:
    case FINAL_SAVING:
    case KILLING:
      return FinalApplicationStatus.UNDEFINED;    
    // finished without a proper final state is the same as failed  
    case FINISHING:
    case FINISHED:
    case FAILED:
      return FinalApplicationStatus.FAILED;
    case KILLED:
      return FinalApplicationStatus.KILLED;
    }
    throw new YarnRuntimeException("Unknown state passed!");
  }

  @Override
  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
    this.writeLock.lock();
    try {
      int updatedNodeCount = this.updatedNodes.size();
      upNodes.putAll(this.updatedNodes);
      this.updatedNodes.clear();
      return updatedNodeCount;
    } finally {
      this.writeLock.unlock();
    }
  }
  
  @Override
  public ApplicationReport createAndGetApplicationReport(String clientUserName,
      boolean allowAccess) {
    this.readLock.lock();

    try {
      ApplicationAttemptId currentApplicationAttemptId = null;
      org.apache.hadoop.yarn.api.records.Token clientToAMToken = null;
      String trackingUrl = UNAVAILABLE;
      String host = UNAVAILABLE;
      String origTrackingUrl = UNAVAILABLE;
      LogAggregationStatus logAggregationStatus = null;
      int rpcPort = -1;
      ApplicationResourceUsageReport appUsageReport =
          RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
      FinalApplicationStatus finishState = getFinalApplicationStatus();
      String diags = UNAVAILABLE;
      float progress = 0.0f;
      org.apache.hadoop.yarn.api.records.Token amrmToken = null;
      if (allowAccess) {
        trackingUrl = rmContext.getAppProxyUrl(conf, applicationId);
        if (this.currentAttempt != null) {
          currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
          trackingUrl = this.currentAttempt.getTrackingUrl();
          origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
          if (UserGroupInformation.isSecurityEnabled()) {
            // get a token so the client can communicate with the app attempt
            // NOTE: token may be unavailable if the attempt is not running
            Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
                this.currentAttempt.createClientToken(clientUserName);
            if (attemptClientToAMToken != null) {
              clientToAMToken = BuilderUtils.newClientToAMToken(
                  attemptClientToAMToken.getIdentifier(),
                  attemptClientToAMToken.getKind().toString(),
                  attemptClientToAMToken.getPassword(),
                  attemptClientToAMToken.getService().toString());
            }
          }
          host = this.currentAttempt.getHost();
          rpcPort = this.currentAttempt.getRpcPort();
          appUsageReport = currentAttempt.getApplicationResourceUsageReport();
          progress = currentAttempt.getProgress();
          logAggregationStatus = this.getLogAggregationStatusForAppReport();
        }
        //if the diagnostics is not already set get it from attempt
        diags = getDiagnostics().toString();

        if (currentAttempt != null && 
            currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
          if (getApplicationSubmissionContext().getUnmanagedAM() &&
              clientUserName != null && getUser().equals(clientUserName)) {
            Token<AMRMTokenIdentifier> token = currentAttempt.getAMRMToken();
            if (token != null) {
              amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(),
                  token.getKind().toString(), token.getPassword(),
                  token.getService().toString());
            }
          }
        }

        RMAppMetrics rmAppMetrics = getRMAppMetrics();
        appUsageReport
            .setResourceSecondsMap(rmAppMetrics.getResourceSecondsMap());
        appUsageReport.setPreemptedResourceSecondsMap(
            rmAppMetrics.getPreemptedResourceSecondsMap());
      }

      if (currentApplicationAttemptId == null) {
        currentApplicationAttemptId = 
            BuilderUtils.newApplicationAttemptId(this.applicationId, 
                DUMMY_APPLICATION_ATTEMPT_NUMBER);
      }

      ApplicationReport report = BuilderUtils.newApplicationReport(
          this.applicationId, currentApplicationAttemptId, this.user,
          this.queue, this.name, host, rpcPort, clientToAMToken,
          createApplicationState(), diags, trackingUrl, this.startTime,
          this.launchTime, this.finishTime, finishState, appUsageReport,
          origTrackingUrl, progress, this.applicationType, amrmToken,
          applicationTags, this.getApplicationPriority());
      report.setLogAggregationStatus(logAggregationStatus);
      report.setUnmanagedApp(submissionContext.getUnmanagedAM());
      report.setAppNodeLabelExpression(getAppNodeLabelExpression());
      report.setAmNodeLabelExpression(getAmNodeLabelExpression());
      if (HAUtil.isFederationEnabled(conf)) {
        report.setRMClusterId(YarnConfiguration.getClusterId(conf));
      }

      ApplicationTimeout timeout = ApplicationTimeout
          .newInstance(ApplicationTimeoutType.LIFETIME, UNLIMITED, UNKNOWN);
      // Currently timeout type supported is LIFETIME. When more timeout types
      // are supported in YARN-5692, the below logic need to be changed.
      if (!this.applicationTimeouts.isEmpty()) {
        long timeoutInMillis = applicationTimeouts
            .get(ApplicationTimeoutType.LIFETIME).longValue();
        timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis));
        if (isAppInCompletedStates()) {
          // if application configured with timeout and finished before timeout
          // happens then remaining time should not be calculated.
          timeout.setRemainingTime(0);
        } else {
          timeout.setRemainingTime(
              Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
        }
      }
      report.setApplicationTimeouts(
          Collections.singletonMap(timeout.getTimeoutType(), timeout));
      return report;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getFinishTime() {
    this.readLock.lock();

    try {
      return this.finishTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getStartTime() {
    this.readLock.lock();

    try {
      return this.startTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getLaunchTime() {
    this.readLock.lock();

    try {
      return this.launchTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getSubmitTime() {
    return this.submitTime;
  }

  @Override
  public String getTrackingUrl() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getTrackingUrl();
    }
    return null;
  }

  @Override
  public String getOriginalTrackingUrl() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getOriginalTrackingUrl();
    }
    return null;
  }

  @Override
  public StringBuilder getDiagnostics() {
    this.readLock.lock();
    try {
      if (diagnostics.length() == 0 && getCurrentAppAttempt() != null) {
        String appAttemptDiagnostics = getCurrentAppAttempt().getDiagnostics();
        if (appAttemptDiagnostics != null) {
          return new StringBuilder(appAttemptDiagnostics);
        }
      }
      return this.diagnostics;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public int getMaxAppAttempts() {
    return this.maxAppAttempts;
  }

  @Override
  public void handle(RMAppEvent event) {
    this.writeLock.lock();

    try {
      ApplicationId appID = event.getApplicationId();
      LOG.debug("Processing event for {} of type {}",
          appID, event.getType());

      final RMAppState oldState = getState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitionException e) {
        LOG.error("App: " + appID
            + " can't handle this event at current state", e);
        onInvalidStateTransition(event.getType(), oldState);
      }

      // Log at INFO if we're not recovering or not in a terminal state.
      // Log at DEBUG otherwise.
      if ((oldState != getState()) &&
          (((recoveredFinalState == null)) ||
            (event.getType() != RMAppEventType.RECOVER))) {
        LOG.info(String.format(STATE_CHANGE_MESSAGE, appID, oldState,
            getState(), event.getType()));
      } else if ((oldState != getState()) && LOG.isDebugEnabled()) {
        LOG.debug(String.format(STATE_CHANGE_MESSAGE, appID, oldState,
            getState(), event.getType()));
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public void recover(RMState state) {
    ApplicationStateData appState =
        state.getApplicationState().get(getApplicationId());
    this.recoveredFinalState = appState.getState();

    if (recoveredFinalState == null) {
      LOG.info(String.format(RECOVERY_MESSAGE, getApplicationId(),
          appState.getAttemptCount(), "NONE"));
    } else if (LOG.isDebugEnabled()) {
      LOG.debug(String.format(RECOVERY_MESSAGE, getApplicationId(),
          appState.getAttemptCount(), recoveredFinalState));
    }

    this.diagnostics.append(null == appState.getDiagnostics() ? "" : appState
        .getDiagnostics());
    this.storedFinishTime = appState.getFinishTime();
    this.startTime = appState.getStartTime();
    this.launchTime = appState.getLaunchTime();
    this.callerContext = appState.getCallerContext();
    this.applicationTimeouts = appState.getApplicationTimeouts();
    // If interval > 0, some attempts might have been deleted.
    if (this.attemptFailuresValidityInterval > 0) {
      this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
      this.nextAttemptId = firstAttemptIdInStateStore;
    }
    //TODO recover collector address.
    //this.collectorAddr = appState.getCollectorAddr();

    // send the ATS create Event during RM recovery.
    // NOTE: it could be duplicated with events sent before RM get restarted.
    sendATSCreateEvent();
    RMAppAttemptImpl preAttempt = null;
    for (ApplicationAttemptId attemptId :
        new TreeSet<>(appState.attempts.keySet())) {
      // create attempt
      createNewAttempt(attemptId);
      ((RMAppAttemptImpl)this.currentAttempt).recover(state);
      // If previous attempt is not in final state, it means we failed to store
      // its final state. We set it to FAILED now because we could not make sure
      // about its final state.
      if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
        preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
      }
      preAttempt = (RMAppAttemptImpl)currentAttempt;
    }
    if (currentAttempt != null) {
      nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
    }
  }

  private void createNewAttempt() {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
    createNewAttempt(appAttemptId);
  }

  private void createNewAttempt(ApplicationAttemptId appAttemptId) {
    BlacklistManager currentAMBlacklistManager;
    if (currentAttempt != null) {
      // Transfer over the blacklist from the previous app-attempt.
      currentAMBlacklistManager = currentAttempt.getAMBlacklistManager();
    } else {
      if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
        currentAMBlacklistManager = new SimpleBlacklistManager(
            RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
                getAMResourceRequests()),
            blacklistDisableThreshold);
      } else {
        currentAMBlacklistManager = new DisabledBlacklistManager();
      }
    }
    RMAppAttempt attempt =
        new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
          submissionContext, conf, amReqs, this, currentAMBlacklistManager);
    attempts.put(appAttemptId, attempt);
    currentAttempt = attempt;
  }

  private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    createNewAttempt();
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

  private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
    NodeState nodeState = node.getState();
    updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
    LOG.debug("Received node update event:{} for node:{} with state:",
        type, node, nodeState);
  }

  private static class RMAppTransition implements
      SingleArcTransition<RMAppImpl, RMAppEvent> {
    public void transition(RMAppImpl app, RMAppEvent event) {
    };
  }

  private static final class RMAppNodeUpdateTransition extends RMAppTransition {
    public void transition(RMAppImpl app, RMAppEvent event) {
      RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
      app.processNodeUpdate(nodeUpdateEvent.getUpdateType(),
          nodeUpdateEvent.getNode());
    };
  }

  private static final class RMAppStateUpdateTransition
      extends RMAppTransition {
    private YarnApplicationState stateToATS;

    public RMAppStateUpdateTransition(YarnApplicationState state) {
      stateToATS = state;
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.rmContext.getSystemMetricsPublisher().appStateUpdated(
          app, stateToATS, app.systemClock.getTime());
    };
  }

  private static final class AttemptLaunchedTransition
      extends  RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      if(app.launchTime == 0) {
        LOG.info("update the launch time for applicationId: "+
                app.getApplicationId()+", attemptId: "+
                app.getCurrentAppAttempt().getAppAttemptId()+
                "launchTime: "+event.getTimestamp());
        ApplicationStateData appState = ApplicationStateData.newInstance(
            app.submitTime, app.startTime, app.submissionContext, app.user,
            app.callerContext);
        appState.setApplicationTimeouts(app.getApplicationTimeouts());
        appState.setLaunchTime(event.getTimestamp());
        app.rmContext.getStateStore().updateApplicationState(appState, false);
        app.launchTime = event.getTimestamp();
        app.rmContext.getSystemMetricsPublisher().appLaunched(
            app, app.launchTime);
      }
    }
  }

  private static final class AppRunningOnNodeTransition extends RMAppTransition {
    public void transition(RMAppImpl app, RMAppEvent event) {
      RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
      
      // if final state already stored, notify RMNode
      if (isAppInFinalState(app)) {
        app.handler.handle(
            new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
                .getApplicationId()));
        return;
      }
      
      // otherwise, add it to ranNodes for further process
      app.ranNodes.add(nodeAddedEvent.getNodeId());

      app.logAggregation.addReportIfNecessary(
          nodeAddedEvent.getNodeId(), app.getApplicationId());
    }
  }

  // synchronously recover attempt to ensure any incoming external events
  // to be processed after the attempt processes the recover event.
  private void recoverAppAttempts() {
    for (RMAppAttempt attempt : getAppAttempts().values()) {
      attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
        RMAppAttemptEventType.RECOVER));
    }
  }

  private static final class RMAppRecoveredTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {

      RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
      app.recover(recoverEvent.getRMState());
      // The app has completed.
      if (app.recoveredFinalState != null) {
        app.recoverAppAttempts();
        new FinalTransition(app.recoveredFinalState).transition(app, event);
        return app.recoveredFinalState;
      }

      if (UserGroupInformation.isSecurityEnabled()) {
        // asynchronously renew delegation token on recovery.
        try {
          app.rmContext.getDelegationTokenRenewer()
              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
                  BuilderUtils.parseCredentials(app.submissionContext),
                  app.submissionContext.getCancelTokensWhenComplete(),
                  app.getUser(),
                  BuilderUtils.parseTokensConf(app.submissionContext));
        } catch (Exception e) {
          String msg = "Failed to fetch user credentials from application:" + e
              .getMessage();
          app.diagnostics.append(msg);
          LOG.error(msg, e);
        }
      }

      for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
          .entrySet()) {
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            timeout.getKey(), timeout.getValue());
        if (LOG.isDebugEnabled()) {
          long remainingTime = timeout.getValue() - app.systemClock.getTime();
          LOG.debug("Application " + app.applicationId
              + " is registered for timeout monitor, type=" + timeout.getKey()
              + " remaining timeout=" + (remainingTime > 0 ?
              remainingTime / 1000 :
              0) + " seconds");
        }
      }

      // No existent attempts means the attempt associated with this app was not
      // started or started but not yet saved.
      if (app.attempts.isEmpty()) {
        app.scheduler.handle(
            new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
                app.applicationPriority, app.placementContext));
        return RMAppState.SUBMITTED;
      }

      // Add application to scheduler synchronously to guarantee scheduler
      // knows applications before AM or NM re-registers.
      app.scheduler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
              app.applicationPriority, app.placementContext));

      // recover attempts
      app.recoverAppAttempts();

      // YARN-1507 is saving the application state after the application is
      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
      // Thus we return ACCECPTED state on recovery.
      return RMAppState.ACCEPTED;
    }
  }

  private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // send the ATS create Event
      app.sendATSCreateEvent();
    }
  }

  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

  private static final class FinalStateSavedTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
      Map<ApplicationTimeoutType, Long> timeouts =
          app.submissionContext.getApplicationTimeouts();
      if (timeouts != null && timeouts.size() > 0) {
        app.rmContext.getRMAppLifetimeMonitor()
            .unregisterApp(app.getApplicationId(), timeouts.keySet());
      }

      if (app.transitionTodo instanceof SingleArcTransition) {
        ((SingleArcTransition) app.transitionTodo).transition(app,
          app.eventCausingFinalSaving);
      } else if (app.transitionTodo instanceof MultipleArcTransition) {
        ((MultipleArcTransition) app.transitionTodo).transition(app,
          app.eventCausingFinalSaving);
      }
      return app.targetedFinalState;
    }
  }

  private static class AttemptFailedFinalStateSavedTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      String msg = null;
      if (event instanceof RMAppFailedAttemptEvent) {
        msg = app.getAppAttemptFailedDiagnostics(event);
      }
      LOG.info(msg);
      app.diagnostics.append(msg);
      // Inform the node for app-finish
      new FinalTransition(RMAppState.FAILED).transition(app, event);
    }
  }

  private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
    String msg = null;
    RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
    if (this.submissionContext.getUnmanagedAM()) {
      // RM does not manage the AM. Do not retry
      msg = "Unmanaged application " + this.getApplicationId()
              + " failed due to " + failedEvent.getDiagnosticMsg()
              + ". Failing the application.";
    } else if (this.isNumAttemptsBeyondThreshold) {
      int globalLimit = conf.getInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
          conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
              YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
      msg = String.format(
        "Application %s failed %d times%s%s due to %s. Failing the application.",
          getApplicationId(),
          maxAppAttempts,
          (attemptFailuresValidityInterval <= 0 ? ""
               : (" in previous " + attemptFailuresValidityInterval
                  + " milliseconds")),
          (globalLimit == maxAppAttempts) ? ""
              : (" (global limit =" + globalLimit
                 + "; local limit is =" + maxAppAttempts + ")"),
          failedEvent.getDiagnosticMsg());
    }
    return msg;
  }

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      long applicationLifetime =
          app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
      applicationLifetime = app.scheduler
          .checkAndGetApplicationLifetime(app.queue, applicationLifetime);
      if (applicationLifetime > 0) {
        // calculate next timeout value
        Long newTimeout =
            Long.valueOf(app.submitTime + (applicationLifetime * 1000));
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            ApplicationTimeoutType.LIFETIME, newTimeout);

        // update applicationTimeouts with new absolute value.
        app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
            newTimeout);

        LOG.info("Application " + app.applicationId
            + " is registered for timeout monitor, type="
            + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
            + " seconds");
      }

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

  private void rememberTargetTransitions(RMAppEvent event,
      Object transitionToDo, RMAppState targetFinalState) {
    transitionTodo = transitionToDo;
    targetedFinalState = targetFinalState;
    eventCausingFinalSaving = event;
  }

  private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
      Object transitionToDo, RMAppState targetFinalState,
      RMAppState stateToBeStored) {
    rememberTargetTransitions(event, transitionToDo, targetFinalState);
    this.stateBeforeFinalSaving = getState();
    this.storedFinishTime = this.systemClock.getTime();

    LOG.info("Updating application " + this.applicationId
        + " with final state: " + this.targetedFinalState);
    // we lost attempt_finished diagnostics in app, because attempt_finished
    // diagnostics is sent after app final state is saved. Later on, we will
    // create GetApplicationAttemptReport specifically for getting per attempt
    // info.
    String diags = null;
    switch (event.getType()) {
    case APP_REJECTED:
    case ATTEMPT_FINISHED:
    case ATTEMPT_KILLED:
      diags = event.getDiagnosticMsg();
      break;
    case ATTEMPT_FAILED:
      RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
      diags = getAppAttemptFailedDiagnostics(failedEvent);
      break;
    default:
      break;
    }

    ApplicationStateData appState =
        ApplicationStateData.newInstance(this.submitTime, this.startTime,
            this.getUser(), this.getRealUser(), this.submissionContext,
            stateToBeStored, diags, this.launchTime, this.storedFinishTime,
            this.callerContext);
    appState.setApplicationTimeouts(this.applicationTimeouts);
    this.rmContext.getStateStore().updateApplicationState(appState);
  }

  private static final class FinalSavingTransition extends RMAppTransition {
    Object transitionToDo;
    RMAppState targetedFinalState;
    RMAppState stateToBeStored;

    public FinalSavingTransition(Object transitionToDo,
        RMAppState targetedFinalState) {
      this(transitionToDo, targetedFinalState, targetedFinalState);
    }

    public FinalSavingTransition(Object transitionToDo,
        RMAppState targetedFinalState, RMAppState stateToBeStored) {
      this.transitionToDo = transitionToDo;
      this.targetedFinalState = targetedFinalState;
      this.stateToBeStored = stateToBeStored;
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
          targetedFinalState, stateToBeStored);
    }
  }

  private static class AttemptUnregisteredTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.finishTime = app.storedFinishTime;
    }
  }

  private static class AppFinishedTransition extends FinalTransition {
    public AppFinishedTransition() {
      super(RMAppState.FINISHED);
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
    };
  }

  private static class AttemptFinishedAtFinalSavingTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      if (app.targetedFinalState.equals(RMAppState.FAILED)
          || app.targetedFinalState.equals(RMAppState.KILLED)) {
        // Ignore Attempt_Finished event if we were supposed to reach FAILED
        // FINISHED state
        return;
      }

      // pass in the earlier attempt_unregistered event, as it is needed in
      // AppFinishedFinalStateSavedTransition later on
      app.rememberTargetTransitions(event,
        new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
        RMAppState.FINISHED);
    };
  }

  private static class AppFinishedFinalStateSavedTransition extends
      RMAppTransition {
    RMAppEvent attemptUnregistered;

    public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
      this.attemptUnregistered = attemptUnregistered;
    }
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
      FINISHED_TRANSITION.transition(app, event);
    };
  }

  /**
   * Log the audit event for kill by client.
   *
   * @param event
   *          The {@link RMAppEvent} to be logged
   */
  static void auditLogKillEvent(RMAppEvent event) {
    if (event instanceof RMAppKillByClientEvent) {
      RMAppKillByClientEvent killEvent = (RMAppKillByClientEvent) event;
      UserGroupInformation callerUGI = killEvent.getCallerUGI();
      String userName = null;
      if (callerUGI != null) {
        userName = callerUGI.getShortUserName();
      }
      InetAddress remoteIP = killEvent.getIp();
      RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
          "RMAppImpl", event.getApplicationId(), remoteIP);
    }
  }

  private static class AppKilledTransition extends FinalTransition {
    public AppKilledTransition() {
      super(RMAppState.KILLED);
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
      RMAppImpl.auditLogKillEvent(event);
    };
  }

  private static class KillAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.stateBeforeKilling = app.getState();
      // Forward app kill diagnostics in the event to kill app attempt.
      // These diagnostics will be returned back in ATTEMPT_KILLED event sent by
      // RMAppAttemptImpl.
      app.handler.handle(
          new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
              RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
      RMAppImpl.auditLogKillEvent(event);
    }
  }

  private static final class AppRejectedTransition extends FinalTransition {
    public AppRejectedTransition() {
      super(RMAppState.FAILED);
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
    };
  }

  /**
   * Attempt to perform a type-specific cleanup after application has completed.
   *
   * @param app application to clean up
   */
  static void appAdminClientCleanUp(RMAppImpl app) {
    try {
      AppAdminClient client = AppAdminClient.createAppAdminClient(app
          .applicationType, app.conf);
      int result = client.actionCleanUp(app.name, app.user);
      if (result == 0) {
        LOG.info("Type-specific cleanup of application " + app.applicationId
            + " of type " + app.applicationType + " succeeded");
      } else {
        LOG.warn("Type-specific cleanup of application " + app.applicationId
            + " of type " + app.applicationType + " did not succeed with exit"
            + " code " + result);
      }
    } catch (IllegalArgumentException e) {
      // no AppAdminClient class has been specified for the application type,
      // so this does not need to be logged
    } catch (Exception e) {
      LOG.warn("Could not run type-specific cleanup on application " +
          app.applicationId + " of type " + app.applicationType, e);
    }
  }

  private static class FinalTransition extends RMAppTransition {

    private final RMAppState finalState;

    FinalTransition(RMAppState finalState) {
      this.finalState = finalState;
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      completeAndCleanupApp(app);
      handleAppFinished(app);
      app.clearUnusedFields();
      appAdminClientCleanUp(app);
    }

    private void completeAndCleanupApp(RMAppImpl app) {
      //cleanup app in RM Nodes
      for (NodeId nodeId : app.getRanNodes()) {
        app.handler.handle(
                new RMNodeCleanAppEvent(nodeId, app.applicationId));
      }
      app.ranNodes.clear();
      // Recovered apps that are completed were not added to scheduler, so no
      // need to remove them from scheduler.
      if (app.recoveredFinalState == null) {
        app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
            finalState));
      }

      app.handler.handle(new RMAppManagerEvent(app.applicationId,
              RMAppManagerEventType.APP_COMPLETED));
    }

    private void handleAppFinished(RMAppImpl app) {
      app.logAggregation
          .recordLogAggregationStartTime(app.systemClock.getTime());
      // record finish time
      app.finishTime = app.storedFinishTime;
      if (app.finishTime == 0) {
        app.finishTime = app.systemClock.getTime();
      }

      //record finish in history and metrics
      app.rmContext.getRMApplicationHistoryWriter()
          .applicationFinished(app, finalState);
      app.rmContext.getSystemMetricsPublisher()
          .appFinished(app, finalState, app.finishTime);
    }
  }

  public int getNumFailedAppAttempts() {
    int completedAttempts = 0;
    // Do not count AM preemption, hardware failures or NM resync
    // as attempt failure.
    for (RMAppAttempt attempt : attempts.values()) {
      if (attempt.shouldCountTowardsMaxAttemptRetry()) {
        completedAttempts++;
      }
    }
    return completedAttempts;
  }

  private static final class AttemptFailedTransition implements
        MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    private final RMAppState initialState;

    public AttemptFailedTransition(RMAppState initialState) {
      this.initialState = initialState;
    }

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
      int numberOfFailure = app.getNumFailedAppAttempts();
      if (app.maxAppAttempts == 1) {
        // If the user explicitly set the attempts to 1 then there are likely
        // correctness issues if the AM restarts for any reason.
        LOG.info("Max app attempts is 1 for " + app.applicationId
            + ", preventing further attempts.");
        numberOfFailure = app.maxAppAttempts;
      } else {
        LOG.info("The number of failed attempts"
            + (app.attemptFailuresValidityInterval > 0 ? " in previous "
            + app.attemptFailuresValidityInterval + " milliseconds " : " ")
            + "is " + numberOfFailure + ". The max attempts is "
            + app.maxAppAttempts);

        if (app.attemptFailuresValidityInterval > 0) {
          removeExcessAttempts(app);
        }
      }

      if (!app.submissionContext.getUnmanagedAM()
          && numberOfFailure < app.maxAppAttempts) {
        if (initialState.equals(RMAppState.KILLING)) {
          // If this is not last attempt, app should be killed instead of
          // launching a new attempt
          app.rememberTargetTransitionsAndStoreState(event,
            new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
          return RMAppState.FINAL_SAVING;
        }

        boolean transferStateFromPreviousAttempt;
        RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
        transferStateFromPreviousAttempt =
            failedEvent.getTransferStateFromPreviousAttempt();

        RMAppAttempt oldAttempt = app.currentAttempt;
        app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
        // Transfer the state from the previous attempt to the current attempt.
        // Note that the previous failed attempt may still be collecting the
        // container events from the scheduler and update its data structures
        // before the new attempt is created. We always transferState for
        // finished containers so that they can be acked to NM,
        // but when pulling finished container we will check this flag again.
        ((RMAppAttemptImpl) app.currentAttempt)
          .transferStateFromAttempt(oldAttempt);
        return initialState;
      } else {
        if (numberOfFailure >= app.maxAppAttempts) {
          app.isNumAttemptsBeyondThreshold = true;
        }
        app.rememberTargetTransitionsAndStoreState(event,
          new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
          RMAppState.FAILED);
        return RMAppState.FINAL_SAVING;
      }
    }

    private void removeExcessAttempts(RMAppImpl app) {
      while (app.nextAttemptId
          - app.firstAttemptIdInStateStore > app.maxAppAttempts) {
        // attempts' first element is oldest attempt because it is a
        // LinkedHashMap
        ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
            app.getApplicationId(), app.firstAttemptIdInStateStore);
        RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId);
        long endTime = app.systemClock.getTime();
        if (rmAppAttempt.getFinishTime() < (endTime
            - app.attemptFailuresValidityInterval)) {
          app.firstAttemptIdInStateStore++;
          LOG.info("Remove attempt from state store : " + attemptId);
          app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
        } else {
          break;
        }
      }
    }
  }

  @Override
  public String getApplicationType() {
    return this.applicationType;
  }

  @Override
  public Set<String> getApplicationTags() {
    return this.applicationTags;
  }

  @Override
  public boolean isAppFinalStateStored() {
    RMAppState state = getState();
    return state.equals(RMAppState.FINISHING)
        || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
        || state.equals(RMAppState.KILLED);
  }

  @Override
  public YarnApplicationState createApplicationState() {
    RMAppState rmAppState = getState();
    // If App is in FINAL_SAVING state, return its previous state.
    if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
      rmAppState = stateBeforeFinalSaving;
    }
    if (rmAppState.equals(RMAppState.KILLING)) {
      rmAppState = stateBeforeKilling;
    }
    return RMServerUtils.createApplicationState(rmAppState);
  }
  
  public static boolean isAppInFinalState(RMApp rmApp) {
    RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
    if (appState == null) {
      appState = rmApp.getState();
    }
    return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
        || appState == RMAppState.KILLED;
  }

  @Override
  public boolean isAppInCompletedStates() {
    RMAppState appState = getState();
    return appState == RMAppState.FINISHED || appState == RMAppState.FINISHING
        || appState == RMAppState.FAILED || appState == RMAppState.KILLED
        || appState == RMAppState.FINAL_SAVING
        || appState == RMAppState.KILLING;
  }

  @Override
  public ApplicationPlacementContext getApplicationPlacementContext() {
    return placementContext;
  }

  public RMAppState getRecoveredFinalState() {
    return this.recoveredFinalState;
  }

  @Override
  public Set<NodeId> getRanNodes() {
    return ranNodes;
  }
  
  @Override
  public RMAppMetrics getRMAppMetrics() {
    Resource resourcePreempted = Resource.newInstance(0, 0);
    int numAMContainerPreempted = 0;
    int numNonAMContainerPreempted = 0;
    Map<String, Long> resourceSecondsMap = new HashMap<>();
    Map<String, Long> preemptedSecondsMap = new HashMap<>();
    int totalAllocatedContainers = 0;
    this.readLock.lock();
    try {
      for (RMAppAttempt attempt : attempts.values()) {
        if (null != attempt) {
          RMAppAttemptMetrics attemptMetrics =
              attempt.getRMAppAttemptMetrics();
          Resources.addTo(resourcePreempted,
              attemptMetrics.getResourcePreempted());
          numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
          numNonAMContainerPreempted +=
              attemptMetrics.getNumNonAMContainersPreempted();
          // getAggregateAppResourceUsage() will calculate resource usage stats
          // for both running and finished containers.
          AggregateAppResourceUsage resUsage =
              attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
          for (Map.Entry<String, Long> entry : resUsage
              .getResourceUsageSecondsMap().entrySet()) {
            long value = RMServerUtils
                .getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
            value += entry.getValue();
            resourceSecondsMap.put(entry.getKey(), value);
          }
          for (Map.Entry<String, Long> entry : attemptMetrics
              .getPreemptedResourceSecondsMap().entrySet()) {
            long value = RMServerUtils
                .getOrDefault(preemptedSecondsMap, entry.getKey(), 0L);
            value += entry.getValue();
            preemptedSecondsMap.put(entry.getKey(), value);
          }
          totalAllocatedContainers +=
              attemptMetrics.getTotalAllocatedContainers();
        }
      }
    } finally {
      this.readLock.unlock();
    }

    return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
        numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap,
        totalAllocatedContainers);
  }

  @Private
  @VisibleForTesting
  public void setSystemClock(Clock clock) {
    this.systemClock = clock;
  }

  @Override
  public ReservationId getReservationId() {
    return submissionContext.getReservationID();
  }
  
  @Override
  public List<ResourceRequest> getAMResourceRequests() {
    return this.amReqs;
  }

  @Override
  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
    return logAggregation.getLogAggregationReportsForApp(this);
  }

  public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
    logAggregation.aggregateLogReport(nodeId, report, this);
  }

  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
    return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
  }

  @Override
  public LogAggregationStatus getLogAggregationStatusForAppReport() {
    return logAggregation
        .getLogAggregationStatusForAppReport(this);
  }

  @Override
  public String getAppNodeLabelExpression() {
    String appNodeLabelExpression =
        getApplicationSubmissionContext().getNodeLabelExpression();
    appNodeLabelExpression = (appNodeLabelExpression == null)
        ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression;
    appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty())
        ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appNodeLabelExpression;
    return appNodeLabelExpression;
  }

  @Override
  public String getAmNodeLabelExpression() {
    String amNodeLabelExpression = null;
    if (!getApplicationSubmissionContext().getUnmanagedAM()) {
      amNodeLabelExpression =
          getAMResourceRequests() != null && !getAMResourceRequests().isEmpty()
              ? getAMResourceRequests().get(0).getNodeLabelExpression() : null;
      amNodeLabelExpression = (amNodeLabelExpression == null)
          ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression;
      amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty())
          ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : amNodeLabelExpression;
    }
    return amNodeLabelExpression;
  }

  @Override
  public CallerContext getCallerContext() {
    return callerContext;
  }

  private void sendATSCreateEvent() {
    rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
    rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
    String appViewACLs = submissionContext.getAMContainerSpec()
        .getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
    rmContext.getSystemMetricsPublisher().appACLsUpdated(
        this, appViewACLs, systemClock.getTime());
  }

  @Private
  @VisibleForTesting
  public int getNextAttemptId() {
    return nextAttemptId;
  }

  private long getApplicationLifetime(ApplicationTimeoutType type) {
    Map<ApplicationTimeoutType, Long> timeouts =
        this.submissionContext.getApplicationTimeouts();
    long applicationLifetime = -1;
    if (timeouts != null && timeouts.containsKey(type)) {
      applicationLifetime = timeouts.get(type);
    }
    return applicationLifetime;
  }

  @Override
  public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
    this.readLock.lock();
    try {
      return new HashMap(this.applicationTimeouts);
    } finally {
      this.readLock.unlock();
    }
  }

  public void updateApplicationTimeout(
      Map<ApplicationTimeoutType, Long> updateTimeout) {
    this.writeLock.lock();
    try {
      if (COMPLETED_APP_STATES.contains(getState())) {
        return;
      }
      // update monitoring service
      this.rmContext.getRMAppLifetimeMonitor()
          .updateApplicationTimeouts(getApplicationId(), updateTimeout);
      this.applicationTimeouts.putAll(updateTimeout);

    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public Priority getApplicationPriority() {
    return applicationPriority;
  }

  public void setApplicationPriority(Priority applicationPriority) {
    this.applicationPriority = applicationPriority;
  }

  /**
     * Clear Unused fields to free memory.
     */
  private void clearUnusedFields() {
    this.submissionContext.setAMContainerSpec(null);
    this.submissionContext.setLogAggregationContext(null);
  }

  @Override
  public Map<String, String> getApplicationSchedulingEnvs() {
    return this.applicationSchedulingEnvs;
  }

  /**
     * catch the InvalidStateTransition.
     * @param state
     * @param rmAppEventType
     */
  protected void onInvalidStateTransition(RMAppEventType rmAppEventType,
              RMAppState state){
      /* TODO fail the application on the failed transition */
  }

  @VisibleForTesting
  long getLogAggregationStartTime() {
    return logAggregation.getLogAggregationStartTime();
  }

  Clock getSystemClock() {
    return systemClock;
  }

  @Override
  public String getRealUser() {
    UserGroupInformation realUserUgi = this.userUgi.getRealUser();
    return (realUserUgi != null) ? realUserUgi.getShortUserName() : null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop RMApp 源码

hadoop RMAppEvent 源码

hadoop RMAppEventType 源码

hadoop RMAppFailedAttemptEvent 源码

hadoop RMAppKillByClientEvent 源码

hadoop RMAppLogAggregation 源码

hadoop RMAppMetrics 源码

hadoop RMAppNodeUpdateEvent 源码

hadoop RMAppRecoverEvent 源码

hadoop RMAppRunningOnNodeEvent 源码

0  赞