hadoop ContainerImpl 源码

  • 2022-10-20
  • 浏览 (230)

haddop ContainerImpl 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;

public class ContainerImpl implements Container {
  private enum LocalizationCounter {
    // 1-to-1 correspondence with MR TaskCounter.LOCALIZED_*
    BYTES_MISSED,
    BYTES_CACHED,
    FILES_MISSED,
    FILES_CACHED,
    MILLIS;
  }

  private static final class ReInitializationContext {
    private final ContainerLaunchContext newLaunchContext;
    private final ResourceSet newResourceSet;

    // Rollback state
    private final ContainerLaunchContext oldLaunchContext;
    private final ResourceSet oldResourceSet;

    private boolean isRollback = false;

    private ReInitializationContext(ContainerLaunchContext newLaunchContext,
        ResourceSet newResourceSet,
        ContainerLaunchContext oldLaunchContext,
        ResourceSet oldResourceSet) {
      this.newLaunchContext = newLaunchContext;
      this.newResourceSet = newResourceSet;
      this.oldLaunchContext = oldLaunchContext;
      this.oldResourceSet = oldResourceSet;
    }

    private boolean canRollback() {
      return (oldLaunchContext != null);
    }

    private ResourceSet mergedResourceSet(ResourceSet current) {
      if (isRollback) {
        // No merging should be done for rollback
        return newResourceSet;
      }
      if (current == newResourceSet) {
        // This happens during a restart
        return current;
      }
      return ResourceSet.merge(current, newResourceSet);
    }

    private ReInitializationContext createContextForRollback() {
      ReInitializationContext cntxt = new ReInitializationContext(
          oldLaunchContext, oldResourceSet, null, null);
      cntxt.isRollback = true;
      return cntxt;
    }
  }

  private final SimpleDateFormat dateFormat =
      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  private final Lock readLock;
  private final Lock writeLock;
  private final Dispatcher dispatcher;
  private final NMStateStoreService stateStore;
  private final Credentials credentials;
  private final NodeManagerMetrics metrics;
  private final long[] localizationCounts =
      new long[LocalizationCounter.values().length];

  private volatile ContainerLaunchContext launchContext;
  private volatile ContainerTokenIdentifier containerTokenIdentifier;
  private final ContainerId containerId;
  private final String user;
  private int version;
  private int exitCode = ContainerExitStatus.INVALID;
  private final StringBuilder diagnostics;
  private final int diagnosticsMaxSize;
  private boolean wasLaunched;
  private boolean wasPaused;
  private long containerLocalizationStartTime;
  private long containerLaunchStartTime;
  private ContainerMetrics containerMetrics;
  private static Clock clock = SystemClock.getInstance();

  private ContainerRetryContext containerRetryContext;
  private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
  private SlidingWindowRetryPolicy retryPolicy;

  private String csiVolumesRootDir;
  private String workDir;
  private String logDir;
  private String host;
  private String ips;
  private String exposedPorts;
  private volatile ReInitializationContext reInitContext;
  private volatile boolean isReInitializing = false;
  private volatile boolean isMarkeForKilling = false;
  private Object containerRuntimeData;

  /** The NM-wide configuration - not specific to this container */
  private final Configuration daemonConf;
  private final long startTime;

  private static final Logger LOG =
       LoggerFactory.getLogger(ContainerImpl.class);

  // whether container has been recovered after a restart
  private RecoveredContainerStatus recoveredStatus =
      RecoveredContainerStatus.REQUESTED;
  // whether container was marked as killed after recovery
  private boolean recoveredAsKilled = false;
  private Context context;
  private ResourceSet resourceSet;
  private ResourceMappings resourceMappings;

  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context) {
    this(conf, dispatcher, launchContext, creds, metrics,
        containerTokenIdentifier, context, SystemClock.getInstance().getTime());
  }

  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context,
      long startTs) {
    this.startTime = startTs;
    this.daemonConf = conf;
    this.dispatcher = dispatcher;
    this.stateStore = context.getNMStateStore();
    this.version = containerTokenIdentifier.getVersion();
    this.launchContext = launchContext;

    this.diagnosticsMaxSize = conf.getInt(
        YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
        YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
    this.containerTokenIdentifier = containerTokenIdentifier;
    this.containerId = containerTokenIdentifier.getContainerID();
    this.diagnostics = new StringBuilder();
    this.credentials = creds;
    this.metrics = metrics;
    user = containerTokenIdentifier.getApplicationSubmitter();
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    this.readLock = readWriteLock.readLock();
    this.writeLock = readWriteLock.writeLock();
    this.context = context;
    boolean containerMetricsEnabled =
        conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);

    if (containerMetricsEnabled) {
      long flushPeriod =
          conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
              YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
      long unregisterDelay = conf.getLong(
          YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
          YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
      containerMetrics = ContainerMetrics
          .forContainer(containerId, flushPeriod, unregisterDelay);
      containerMetrics.recordStartTime(clock.getTime());
    }

    // Configure the Retry Context
    this.containerRetryContext = configureRetryContext(
        conf, launchContext, this.containerId);
    this.windowRetryContext = new SlidingWindowRetryPolicy
        .RetryContext(containerRetryContext);
    this.retryPolicy = new SlidingWindowRetryPolicy(clock);

    stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
        context.getContainerStateTransitionListener());
    this.context = context;
    this.resourceSet = new ResourceSet();
    this.resourceMappings = new ResourceMappings();
  }

  private static ContainerRetryContext configureRetryContext(
      Configuration conf, ContainerLaunchContext launchContext,
      ContainerId containerId) {
    ContainerRetryContext context;
    if (launchContext != null
        && launchContext.getContainerRetryContext() != null) {
      context = launchContext.getContainerRetryContext();
    } else {
      context = ContainerRetryContext.NEVER_RETRY_CONTEXT;
    }
    int minimumRestartInterval = conf.getInt(
        YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
        YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
    if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY
        && context.getRetryInterval() < minimumRestartInterval) {
      LOG.info("Set restart interval to minimum value " + minimumRestartInterval
          + "ms for container " + containerId);
      context.setRetryInterval(minimumRestartInterval);
    }
    return context;
  }

  // constructor for a recovered container
  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context,
      RecoveredContainerState rcs) {
    this(conf, dispatcher, launchContext, creds, metrics,
        containerTokenIdentifier, context, rcs.getStartTime());
    this.recoveredStatus = rcs.getStatus();
    this.exitCode = rcs.getExitCode();
    this.recoveredAsKilled = rcs.getKilled();
    this.diagnostics.append(rcs.getDiagnostics());
    this.version = rcs.getVersion();
    this.windowRetryContext.setRemainingRetries(
        rcs.getRemainingRetryAttempts());
    this.windowRetryContext.setRestartTimes(rcs.getRestartTimes());
    this.workDir = rcs.getWorkDir();
    this.logDir = rcs.getLogDir();
    this.resourceMappings = rcs.getResourceMappings();
  }

  private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
      new ContainerDiagnosticsUpdateTransition();

  // State Machine for each container.
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.SCHEDULED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
    .addTransition(ContainerState.NEW, ContainerState.NEW,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.NEW, ContainerState.DONE,
        ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
    .addTransition(ContainerState.NEW, ContainerState.NEW,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())

    // From LOCALIZING State
    .addTransition(ContainerState.LOCALIZING,
        EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED),
        ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
    .addTransition(ContainerState.LOCALIZING,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceFailedTransition())
    .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER,
        new KillBeforeRunningTransition())
    .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())


    // From LOCALIZATION_FAILED State
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.DONE,
        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
        new LocalizationFailedToDoneTransition())
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    // container not launched so kill is a no-op
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // container cleanup triggers a release of all resources
    // regardless of whether they were localized or not
    // LocalizedResource handles release event in all states
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_LOCALIZED)
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_FAILED)
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())

    // From SCHEDULED State
    .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED,
        ContainerEventType.RECOVER_PAUSED_CONTAINER,
        new RecoveredContainerTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER,
        new KillTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RUNNING State
    .addTransition(ContainerState.RUNNING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RELAUNCHING,
            ContainerState.SCHEDULED,
            ContainerState.EXITED_WITH_FAILURE),
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new RetryFailureTransition())
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RUNNING,
            ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.REINITIALIZE_CONTAINER,
        new ReInitializeContainerTransition())
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RUNNING,
            ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.ROLLBACK_REINIT,
        new RollbackContainerTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceLocalizationFailedWhileRunningTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RUNNING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())


    // From PAUSING State
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
        ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
    // In case something goes wrong then container will exit from the
    // PAUSING state
    .addTransition(ContainerState.PAUSING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
    .addTransition(ContainerState.PAUSING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From PAUSED State
    .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.PAUSE_CONTAINER)
    // This can happen during re-initialization.
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
        ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
    // In case something goes wrong then container will exit from the
    // PAUSED state
    .addTransition(ContainerState.PAUSED,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.PAUSED,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RESUMING State
    .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_RESUMED)
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    // This can happen during re-initialization
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    // In case something goes wrong then container will exit from the
    // RESUMING state
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())
    // NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.

    // From REINITIALIZING State
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.REINITIALIZING,
        EnumSet.of(ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileReInitTransition())
    .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceLocalizationFailedWhileReInitTransition())
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.REINITIALIZING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.REINITIALIZING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // from REINITIALIZING_AWAITING_KILL
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.SCHEDULED,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledForReInitializationTransition())
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RELAUNCHING State
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
    .addTransition(ContainerState.RELAUNCHING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())


    // From CONTAINER_EXITED_WITH_SUCCESS State
    .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
        new ExitedWithSuccessToDoneTransition())
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From EXITED_WITH_FAILURE State
    .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new ExitedWithFailureToDoneTransition())
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
                   ContainerState.EXITED_WITH_FAILURE,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From KILLING State.
    .addTransition(ContainerState.KILLING,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new ContainerKilledTransition())
    .addTransition(ContainerState.KILLING,
        ContainerState.KILLING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new LocalizedResourceDuringKillTransition())
    .addTransition(ContainerState.KILLING, 
        ContainerState.KILLING, 
        ContainerEventType.RESOURCE_FAILED)
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER)
    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(false))
    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(false))
    .addTransition(ContainerState.KILLING,
            ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new KillingToDoneTransition())
    // Handle a launched container during killing stage is a no-op
    // as cleanup container is always handled after launch container event
    // in the container launcher
    .addTransition(ContainerState.KILLING,
        ContainerState.KILLING,
        EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)

    // From CONTAINER_CLEANEDUP_AFTER_KILL State.
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
            ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new ContainerCleanedupAfterKillToDoneTransition())
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.RESOURCE_FAILED,
            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From DONE
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.INIT_CONTAINER)
    .addTransition(ContainerState.DONE, ContainerState.DONE,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    // This transition may result when
    // we notify container of failed localization if localizer thread (for
    // that container) fails for some reason
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        EnumSet.of(ContainerEventType.RESOURCE_FAILED,
            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // create the topology tables
    .installTopology();

  private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
    stateMachine;

  public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
    switch (stateMachine.getCurrentState()) {
    case NEW:
    case LOCALIZING:
    case LOCALIZATION_FAILED:
    case SCHEDULED:
    case PAUSED:
    case RESUMING:
    case RUNNING:
    case RELAUNCHING:
    case REINITIALIZING:
    case REINITIALIZING_AWAITING_KILL:
    case EXITED_WITH_SUCCESS:
    case EXITED_WITH_FAILURE:
    case KILLING:
    case CONTAINER_CLEANEDUP_AFTER_KILL:
    case CONTAINER_RESOURCES_CLEANINGUP:
    case PAUSING:
      return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
    case DONE:
    default:
      return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
    }
  }

  // NOTE: Please update the doc in the ContainerSubState class as
  //       well as the yarn_protos.proto file if this mapping is ever modified.
  private ContainerSubState getContainerSubState() {
    switch (stateMachine.getCurrentState()) {
    case NEW:
    case LOCALIZING:
    case SCHEDULED:
    case REINITIALIZING_AWAITING_KILL:
    case RELAUNCHING:
      return ContainerSubState.SCHEDULED;
    case REINITIALIZING:
    case PAUSING:
    case KILLING:
    case RUNNING:
      return ContainerSubState.RUNNING;
    case PAUSED:
    case RESUMING:
      return ContainerSubState.PAUSED;
    case LOCALIZATION_FAILED:
    case EXITED_WITH_SUCCESS:
    case EXITED_WITH_FAILURE:
    case CONTAINER_CLEANEDUP_AFTER_KILL:
    case CONTAINER_RESOURCES_CLEANINGUP:
      return ContainerSubState.COMPLETING;
    case DONE:
    default:
      return ContainerSubState.DONE;
    }
  }

  public NMTimelinePublisher getNMTimelinePublisher() {
    return context.getNMTimelinePublisher();
  }

  @Override
  public String getUser() {
    this.readLock.lock();
    try {
      return this.user;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public Map<Path, List<String>> getLocalizedResources() {
    this.readLock.lock();
    try {
      if (ContainerState.SCHEDULED == getContainerState()
          || ContainerState.RELAUNCHING == getContainerState()) {
        return resourceSet.getLocalizedResources();
      } else {
        return null;
      }
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public Credentials getCredentials() {
    this.readLock.lock();
    try {
      return credentials;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerState getContainerState() {
    this.readLock.lock();
    try {
      return stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerLaunchContext getLaunchContext() {
    this.readLock.lock();
    try {
      return launchContext;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerStatus cloneAndGetContainerStatus() {
    this.readLock.lock();
    try {
      ContainerStatus status = BuilderUtils.newContainerStatus(this.containerId,
          getCurrentState(), diagnostics.toString(), exitCode, getResource(),
          this.containerTokenIdentifier.getExecutionType());
      status.setIPs(StringUtils.isEmpty(ips) ? null :
          Arrays.asList(ips.split(",")));
      status.setHost(host);
      status.setContainerSubState(getContainerSubState());
      status.setExposedPorts(exposedPorts);
      return status;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public NMContainerStatus getNMContainerStatus() {
    this.readLock.lock();
    try {
      NMContainerStatus status =
          NMContainerStatus.newInstance(this.containerId,
              this.version, getCurrentState(), getResource(),
              diagnostics.toString(), exitCode,
              containerTokenIdentifier.getPriority(),
              containerTokenIdentifier.getCreationTime(),
              containerTokenIdentifier.getNodeLabelExpression(),
              containerTokenIdentifier.getExecutionType(),
              containerTokenIdentifier.getAllocationRequestId());
      status.setAllocationTags(containerTokenIdentifier.getAllcationTags());
      return status;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerId getContainerId() {
    return this.containerId;
  }

  @Override
  public long getContainerStartTime() {
    return this.startTime;
  }

  @Override
  public long getContainerLaunchTime() {
    return this.containerLaunchStartTime;
  }

  @Override
  public Resource getResource() {
    return Resources.clone(
        this.containerTokenIdentifier.getResource());
  }

  @Override
  public ContainerTokenIdentifier getContainerTokenIdentifier() {
    this.readLock.lock();
    try {
      return this.containerTokenIdentifier;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
    this.writeLock.lock();
    try {
      this.containerTokenIdentifier = token;
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String getWorkDir() {
    return workDir;
  }

  @Override
  public void setWorkDir(String workDir) {
    this.workDir = workDir;
  }

  @Override
  public String getCsiVolumesRootDir() {
    return csiVolumesRootDir;
  }

  @Override
  public void setCsiVolumesRootDir(String volumesRootDir) {
    this.csiVolumesRootDir = volumesRootDir;
  }

  private void clearIpAndHost() {
    LOG.info("{} clearing ip and host", containerId);
    this.ips = null;
    this.host = null;
  }

  @Override
  public void setIpAndHost(String[] ipAndHost) {
    this.writeLock.lock();
    try {
      this.ips = ipAndHost[0];
      this.host = ipAndHost[1];
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String getLogDir() {
    return logDir;
  }

  @Override
  public void setLogDir(String logDir) {
    this.logDir = logDir;
  }

  @Override
  public ResourceSet getResourceSet() {
    return this.resourceSet;
  }

  @SuppressWarnings("unchecked")
  private void sendFinishedEvents() {
    // Inform the application
    @SuppressWarnings("rawtypes")
    EventHandler eventHandler = dispatcher.getEventHandler();

    ContainerStatus containerStatus = cloneAndGetContainerStatus();
    eventHandler.handle(
        new ApplicationContainerFinishedEvent(containerStatus, startTime));

    // Tell the scheduler the container is Done
    eventHandler.handle(new ContainerSchedulerEvent(this,
        ContainerSchedulerEventType.CONTAINER_COMPLETED));
    // Remove the container from the resource-monitor
    eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
    // Tell the logService too
    eventHandler.handle(new LogHandlerContainerFinishedEvent(
        containerId, containerTokenIdentifier.getContainerType(), exitCode));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendLaunchEvent() {
    if (ContainerState.PAUSED == getContainerState()) {
      dispatcher.getEventHandler().handle(
          new ContainerResumeEvent(containerId,
              "Container Resumed as some resources freed up"));
    } else {
      ContainersLauncherEventType launcherEvent =
          ContainersLauncherEventType.LAUNCH_CONTAINER;
      if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
        // try to recover a container that was previously launched
        launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
      } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
        launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
      }

      containerLaunchStartTime = clock.getTime();
      dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(this, launcherEvent));
    }

  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendScheduleEvent() {
    if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
      ContainersLauncherEventType launcherEvent;
      launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
      dispatcher.getEventHandler()
          .handle(new ContainersLauncherEvent(this, launcherEvent));
    } else {
      dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
          ContainerSchedulerEventType.SCHEDULE_CONTAINER));
    }
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendKillEvent(int exitStatus, String description) {
    this.isMarkeForKilling = true;
    dispatcher.getEventHandler().handle(
        new ContainerKillEvent(containerId, exitStatus, description));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendPauseEvent(String description) {
    dispatcher.getEventHandler().handle(
        new ContainerPauseEvent(containerId, description));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendRelaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.RELAUNCH_CONTAINER;
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }

  // Inform the ContainersMonitor to start monitoring the container's
  // resource usage.
  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendContainerMonitorStartEvent() {
    long launchDuration = clock.getTime() - containerLaunchStartTime;
    metrics.addContainerLaunchDuration(launchDuration);

    long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
    float pmemRatio = daemonConf.getFloat(
        YarnConfiguration.NM_VMEM_PMEM_RATIO,
        YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
    long vmemBytes = (long) (pmemRatio * pmemBytes);
    int cpuVcores = getResource().getVirtualCores();
    long localizationDuration = containerLaunchStartTime -
        containerLocalizationStartTime;
    dispatcher.getEventHandler().handle(
        new ContainerStartMonitoringEvent(containerId,
        vmemBytes, pmemBytes, cpuVcores, launchDuration,
        localizationDuration));
  }

  private void addDiagnostics(String... diags) {
    for (String s : diags) {
      this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
    }
    if (diagnostics.length() > diagnosticsMaxSize) {
      diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
    }
    try {
      stateStore.storeContainerDiagnostics(containerId, diagnostics);
    } catch (IOException e) {
      LOG.warn("Unable to update diagnostics in state store for "
          + containerId, e);
    }
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  public void cleanup() {
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
        resourceSet.getAllResourcesByVisibility();
    dispatcher.getEventHandler().handle(
        new ContainerLocalizationCleanupEvent(this, rsrc));
  }

  static class ContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Just drain the event and change the state.
    }

  }

  static class UpdateTransition extends ContainerTransition {
    @Override
    public void transition(
        ContainerImpl container, ContainerEvent event) {
      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
      // Update the container token
      container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());

      try {
        // Persist change in the state store.
        container.context.getNMStateStore()
            .storeContainerUpdateToken(container.containerId,
                container.getContainerTokenIdentifier());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.containerId
            + "] update..", e);
      }
    }
  }

  static class NotifyContainerSchedulerOfUpdateTransition extends
      UpdateTransition {
    @Override
    public void transition(
        ContainerImpl container, ContainerEvent event) {

      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
      // Save original token
      ContainerTokenIdentifier originalToken =
          container.containerTokenIdentifier;
      super.transition(container, updateEvent);
      container.dispatcher.getEventHandler().handle(
          new UpdateContainerSchedulerEvent(container,
              originalToken, updateEvent));
    }
  }

  /**
   * State transition when a NEW container receives the INIT_CONTAINER
   * message.
   * 
   * If there are resources to localize, sends a
   * ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
   * to the ResourceLocalizationManager and enters LOCALIZING state.
   * 
   * If there are no resources to localize, sends LAUNCH_CONTAINER event
   * and enters SCHEDULED state directly.
   * 
   * If there are any invalid resources specified, enters LOCALIZATION_FAILED
   * directly.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
        container.sendFinishedEvents();
        return ContainerState.DONE;
      } else if (isContainerRecoveredAsKilled(container)) {
        // container was killed but never launched
        container.metrics.killedContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        container.metrics.releaseContainer(
            container.containerTokenIdentifier.getResource());
        container.sendFinishedEvents();
        return ContainerState.DONE;
      } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
        return ContainerState.SCHEDULED;
      }

      final ContainerLaunchContext ctxt = container.launchContext;
      container.metrics.initingContainer();

      container.dispatcher.getEventHandler().handle(new AuxServicesEvent
          (AuxServicesEventType.CONTAINER_INIT, container));

      // Inform the AuxServices about the opaque serviceData
      Map<String,ByteBuffer> csd = ctxt.getServiceData();
      if (csd != null) {
        // This can happen more than once per Application as each container may
        // have distinct service data
        for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
          container.dispatcher.getEventHandler().handle(
              new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
                  container.user, container.containerId
                      .getApplicationAttemptId().getApplicationId(),
                  service.getKey().toString(), service.getValue()));
        }
      }

      container.containerLocalizationStartTime = clock.getTime();
      // duration = end - start;
      // record in RequestResourcesTransition: -start
      // add in LocalizedTransition: +end
      //
      container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
          = -Time.monotonicNow();

      // Send requests for public, private resources
      Map<String, LocalResource> cntrRsrc;
      try {
        cntrRsrc = container.context
            .getContainerExecutor().getLocalResources(container);
        if (!cntrRsrc.isEmpty()) {
          Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
              container.resourceSet.addResources(ctxt.getLocalResources());
          container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(container, req));
          // Get list of resources for logging
          List<String> resourcePaths = new ArrayList<>();
          for (Collection<LocalResourceRequest> rsrcReqList : req.values()) {
            for (LocalResourceRequest rsrc : rsrcReqList) {
              resourcePaths.add(rsrc.getPath().toString());
            }
          }
          LOG.info("Container " + container.getContainerId()
              + " is localizing: " + resourcePaths);
          return ContainerState.LOCALIZING;
        } else {
          container.sendScheduleEvent();
          container.metrics.endInitingContainer();
          return ContainerState.SCHEDULED;
        }
      } catch (URISyntaxException | IOException e) {
        // malformed resource; abort container launch
        LOG.warn("Failed to parse resource-request", e);
        container.cleanup();
        container.metrics.endInitingContainer();
        return ContainerState.LOCALIZATION_FAILED;
      }
    }

    static boolean isContainerRecoveredAsKilled(ContainerImpl container) {
      if (!container.recoveredAsKilled) {
        return false;
      }
      // container was killed but never launched
      RecoveredContainerStatus containerStatus = container.recoveredStatus;
      return containerStatus == RecoveredContainerStatus.REQUESTED
          || containerStatus == RecoveredContainerStatus.QUEUED;
    }
  }

  /**
   * Transition when one of the requested resources for this container
   * has been successfully localized.
   */
  static class LocalizedTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
      LocalResourceRequest resourceRequest = rsrcEvent.getResource();
      Path location = rsrcEvent.getLocation();
      Set<String> syms =
          container.resourceSet.resourceLocalized(resourceRequest, location);
      if (null == syms) {
        LOG.info("Localized resource " + resourceRequest +
            " for container " + container.containerId);
        return ContainerState.LOCALIZING;
      }

      final long localizedSize = rsrcEvent.getSize();
      if (localizedSize > 0) {
        container.localizationCounts
        [LocalizationCounter.BYTES_MISSED.ordinal()] += localizedSize;
        container.localizationCounts
        [LocalizationCounter.FILES_MISSED.ordinal()]++;
      } else if (localizedSize < 0) {
        // cached: recorded negative, restore the sign
        container.localizationCounts
        [LocalizationCounter.BYTES_CACHED.ordinal()] -= localizedSize;
        container.localizationCounts
        [LocalizationCounter.FILES_CACHED.ordinal()]++;
      }
      container.metrics.localizationCacheHitMiss(localizedSize);

      // check to see if this resource should be uploaded to the shared cache
      // as well
      if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
        container.resourceSet.getResourcesToBeUploaded()
            .put(resourceRequest, location);
      }
      if (!container.resourceSet.getPendingResources().isEmpty()) {
        return ContainerState.LOCALIZING;
      }

      // duration = end - start;
      // record in RequestResourcesTransition: -start
      // add in LocalizedTransition: +end
      //
      container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
          += Time.monotonicNow();
      container.metrics.localizationComplete(
          container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]);
      container.dispatcher.getEventHandler().handle(
          new ContainerLocalizationEvent(LocalizationEventType.
              CONTAINER_RESOURCES_LOCALIZED, container));

      container.sendScheduleEvent();
      container.metrics.endInitingContainer();

      // If this is a recovered container that has already launched, skip
      // uploading resources to the shared cache. We do this to avoid uploading
      // the same resources multiple times. The tradeoff is that in the case of
      // a recovered container, there is a chance that resources don't get
      // uploaded into the shared cache. This is OK because resources are not
      // acknowledged by the SCM until they have been uploaded by the node
      // manager.
      if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
          && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
        // kick off uploads to the shared cache
        container.dispatcher.getEventHandler().handle(
            new SharedCacheUploadEvent(
                container.resourceSet.getResourcesToBeUploaded(), container
                .getLaunchContext(), container.getUser(),
                SharedCacheUploadEventType.UPLOAD));
      }

      return ContainerState.SCHEDULED;
    }
  }

  /**
   * Transition to start the Re-Initialization process.
   */
  static class ReInitializeContainerTransition implements
      MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {

    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(
        ContainerImpl container, ContainerEvent event) {
      container.reInitContext = createReInitContext(container, event);
      boolean resourcesPresent = false;
      try {
        // 'reInitContext.newResourceSet' can be
        // a) current container resourceSet (In case of Restart)
        // b) previous resourceSet (In case of RollBack)
        // c) An actual NEW resourceSet (In case of Upgrade/ReInit)
        //
        // In cases a) and b) Container can immediately be cleaned up since
        // we are sure the resources are already available (we check the
        // pendingResources to verify that nothing more is needed). So we can
        // kill the container immediately
        ResourceSet newResourceSet = container.reInitContext.newResourceSet;
        if (!newResourceSet.getPendingResources().isEmpty()) {
          container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(
                  container, newResourceSet.getAllResourcesByVisibility()));
        } else {
          // We are not waiting on any resources, so...
          // Kill the current container.
          container.dispatcher.getEventHandler().handle(
              new ContainersLauncherEvent(container,
                  ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
          resourcesPresent = true;
        }
        container.metrics.reInitingContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
      } catch (Exception e) {
        LOG.error("Container [" + container.getContainerId() + "]" +
            " re-initialization failure..", e);
        container.addDiagnostics("Error re-initializing due to" +
            "[" + e.getMessage() + "]");
        return ContainerState.RUNNING;
      }
      return resourcesPresent ?
          ContainerState.REINITIALIZING_AWAITING_KILL :
          ContainerState.REINITIALIZING;
    }

    protected ReInitializationContext createReInitContext(
        ContainerImpl container, ContainerEvent event) {
      ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
      if (reInitEvent.getReInitLaunchContext() == null) {
        // This is a Restart...
        // We also need to make sure that if Rollback is possible, the
        // rollback state should be retained in the
        // oldLaunchContext and oldResourceSet
        container.addDiagnostics("Container will be Restarted.\n");
        return new ReInitializationContext(
            container.launchContext, container.resourceSet,
            container.canRollback() ?
                container.reInitContext.oldLaunchContext : null,
            container.canRollback() ?
                container.reInitContext.oldResourceSet : null);
      } else {
        container.addDiagnostics("Container will be Re-initialized.\n");
        return new ReInitializationContext(
            reInitEvent.getReInitLaunchContext(),
            reInitEvent.getResourceSet(),
            // If AutoCommit is turned on, then no rollback can happen...
            // So don't need to store the previous context.
            (reInitEvent.isAutoCommit() ? null : container.launchContext),
            (reInitEvent.isAutoCommit() ? null : container.resourceSet));
      }
    }
  }

  /**
   * Transition to start the Rollback process.
   */
  static class RollbackContainerTransition extends
      ReInitializeContainerTransition {

    @Override
    protected ReInitializationContext createReInitContext(ContainerImpl
        container, ContainerEvent event) {
      container.addDiagnostics("Container upgrade will be Rolled-back.\n");
      LOG.warn("Container [" + container.getContainerId() + "]" +
          " about to be explicitly Rolledback !!");
      return container.reInitContext.createContextForRollback();
    }
  }

  /**
   * Resource requested for Container Re-initialization has been localized.
   * If all dependencies are met, then restart Container with new bits.
   */
  static class ResourceLocalizedWhileReInitTransition
      implements MultipleArcTransition
      <ContainerImpl, ContainerEvent, ContainerState> {


    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(
        ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      container.reInitContext.newResourceSet.resourceLocalized(
          rsrcEvent.getResource(), rsrcEvent.getLocation());
      // Check if all ResourceLocalization has completed
      if (container.reInitContext.newResourceSet.getPendingResources()
          .isEmpty()) {
        // Kill the current container.
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
        return ContainerState.REINITIALIZING_AWAITING_KILL;
      }
      return ContainerState.REINITIALIZING;
    }
  }

  /**
   * Resource is localized while the container is running - create symlinks.
   */
  static class ResourceLocalizedWhileRunningTransition
      extends ContainerTransition {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      Set<String> links = container.resourceSet.resourceLocalized(
          rsrcEvent.getResource(), rsrcEvent.getLocation());
      if (links == null) {
        return;
      }
      // creating symlinks.
      for (String link : links) {
        try {
          String linkFile = new Path(container.workDir, link).toString();
          if (new File(linkFile).exists()) {
            LOG.info("Symlink file already exists: " + linkFile);
          } else {
            container.context.getContainerExecutor()
                .symLink(rsrcEvent.getLocation().toString(), linkFile);
            LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
                .getLocation());
          }
        } catch (IOException e) {
          String message = String
              .format("Error when creating symlink %s -> %s", link,
                  rsrcEvent.getLocation());
          LOG.error(message, e);
        }
      }
    }
  }

  /**
   * Resource localization failed while the container is running.
   */
  static class ResourceLocalizationFailedWhileRunningTransition
      extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceFailedEvent failedEvent =
          (ContainerResourceFailedEvent) event;
      container.resourceSet
          .resourceLocalizationFailed(failedEvent.getResource(),
              failedEvent.getDiagnosticMessage());
      container.addDiagnostics(failedEvent.getDiagnosticMessage());
    }
  }

  /**
   * Resource localization failed while the container is reinitializing.
   */
  static class ResourceLocalizationFailedWhileReInitTransition
      extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceFailedEvent failedEvent =
          (ContainerResourceFailedEvent) event;
      container.resourceSet.resourceLocalizationFailed(
          failedEvent.getResource(), failedEvent.getDiagnosticMessage());
      container.addDiagnostics("Container aborting re-initialization.. "
          + failedEvent.getDiagnosticMessage());
      LOG.error("Container [" + container.getContainerId() + "] Re-init" +
          " failed !! Resource [" + failedEvent.getResource() + "] could" +
          " not be localized !!");
      container.reInitContext = null;
    }
  }

  /**
   * Transition from SCHEDULED state to RUNNING state upon receiving
   * a CONTAINER_LAUNCHED event.
   */
  static class LaunchTransition extends ContainerTransition {
    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.sendContainerMonitorStartEvent();
      container.metrics.runningContainer();
      container.wasLaunched  = true;

      if (container.isReInitializing()) {
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
      }
      container.setIsReInitializing(false);
      // Check if this launch was due to a re-initialization.
      // If autocommit == true, then wipe the re-init context. This ensures
      // that any subsequent failures do not trigger a rollback.
      if (container.reInitContext != null
          && !container.reInitContext.canRollback()) {
        container.reInitContext = null;
      }

      if (container.recoveredAsKilled) {
        LOG.info("Killing " + container.containerId
            + " due to recovered as killed");
        container.addDiagnostics("Container recovered as killed.\n");
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }
    }
  }

  /**
   * Transition from SCHEDULED state to PAUSED state on recovery
   */
  static class RecoveredContainerTransition extends ContainerTransition {
    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.sendContainerMonitorStartEvent();
      container.wasLaunched = true;
      container.setIsPaused(true);
    }
  }

  /**
   * Transition from RUNNING or KILLING state to
   * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
   */
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class ExitedWithSuccessTransition extends ContainerTransition {

    boolean clCleanupRequired;

    public ExitedWithSuccessTransition(boolean clCleanupRequired) {
      this.clCleanupRequired = clCleanupRequired;
    }

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {

      container.setIsReInitializing(false);
      container.setIsPaused(false);
      // Set exit code to 0 on success    	
      container.exitCode = 0;
    	
      // TODO: Add containerWorkDir to the deletion service.

      if (clCleanupRequired) {
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }

      container.cleanup();
    }
  }

  /**
   * Transition to EXITED_WITH_FAILURE state upon
   * CONTAINER_EXITED_WITH_FAILURE state.
   **/
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class ExitedWithFailureTransition extends ContainerTransition {

    boolean clCleanupRequired;

    public ExitedWithFailureTransition(boolean clCleanupRequired) {
      this.clCleanupRequired = clCleanupRequired;
    }

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(false);
      container.setIsReInitializing(false);
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      container.exitCode = exitEvent.getExitCode();
      if (exitEvent.getDiagnosticInfo() != null) {
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }

      // TODO: Add containerWorkDir to the deletion service.
      // TODO: Add containerOuputDir to the deletion service.

      if (clCleanupRequired) {
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }

      container.cleanup();
    }
  }

  /**
   * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon
   * CONTAINER_EXITED_WITH_FAILURE state.
   **/
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class RetryFailureTransition implements
      MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {

    @Override
    public ContainerState transition(final ContainerImpl container,
        ContainerEvent event) {
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      container.exitCode = exitEvent.getExitCode();
      if (exitEvent.getDiagnosticInfo() != null) {
        if (container.containerRetryContext.getRetryPolicy()
            != ContainerRetryPolicy.NEVER_RETRY) {
          container.addDiagnostics("Diagnostic message from attempt : \n");
        }
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }
      if (container.shouldRetry(container.exitCode)) {
        // Updates to the retry context should  be protected from concurrent
        // writes. It should only be called from this transition.
        container.retryPolicy.updateRetryContext(container.windowRetryContext);
        container.storeRetryContext();
        doRelaunch(container,
            container.windowRetryContext.getRemainingRetries(),
            container.containerRetryContext.getRetryInterval());
        return ContainerState.RELAUNCHING;
      } else if (container.canRollback()) {
        // Rollback is possible only if the previous launch context is
        // available.
        container.addDiagnostics("Container Re-init Auto Rolled-Back.");
        LOG.info("Rolling back Container reInitialization for [" +
            container.getContainerId() + "] !!");
        container.reInitContext =
            container.reInitContext.createContextForRollback();
        container.metrics.rollbackContainerOnFailure();
        container.metrics.reInitingContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        new KilledForReInitializationTransition().transition(container, event);
        return ContainerState.SCHEDULED;
      } else {
        new ExitedWithFailureTransition(true).transition(container, event);
        return ContainerState.EXITED_WITH_FAILURE;
      }
    }

    private void doRelaunch(final ContainerImpl container,
        int remainingRetryAttempts, final int retryInterval) {
      if (remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER) {
        LOG.info("Relaunching Container {}. " +
                "retry interval {} ms", container.getContainerId(),
            retryInterval);
      } else {
        LOG.info("Relaunching Container {}. " +
                "remaining retry attempts(after relaunch) {}, " +
                "retry interval {} ms", container.getContainerId(),
            remainingRetryAttempts, retryInterval);
      }

      container.wasLaunched  = false;
      container.metrics.endRunningContainer();
      if (retryInterval == 0) {
        container.sendRelaunchEvent();
      } else {
        // wait for some time, then send launch event
        new Thread() {
          @Override
          public void run() {
            try {
              Thread.sleep(retryInterval);
              container.sendRelaunchEvent();
            } catch (InterruptedException e) {
              return;
            }
          }
        }.start();
      }
    }
  }

  @Override
  public boolean isRetryContextSet() {
    return containerRetryContext.getRetryPolicy()
        != ContainerRetryPolicy.NEVER_RETRY;
  }

  @Override
  public boolean shouldRetry(int errorCode) {
    if (errorCode == ExitCode.SUCCESS.getExitCode()
        || errorCode == ExitCode.FORCE_KILLED.getExitCode()
        || errorCode == ExitCode.TERMINATED.getExitCode()) {
      return false;
    }
    return retryPolicy.shouldRetry(windowRetryContext, errorCode);
  }

  /**
   * Transition to EXITED_WITH_FAILURE
   */
  static class KilledExternallyTransition extends ExitedWithFailureTransition {
    KilledExternallyTransition() {
      super(true);
    }

    @Override
    public void transition(ContainerImpl container,
        ContainerEvent event) {
      super.transition(container, event);
      container.addDiagnostics("Killed by external signal\n");
    }
  }

  /**
   * Transition to SCHEDULED and wait for RE-LAUNCH
   */
  static class KilledForReInitializationTransition extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container,
        ContainerEvent event) {
      LOG.info("Relaunching Container [" + container.getContainerId()
          + "] for re-initialization !!");
      container.wasLaunched  = false;
      container.metrics.endRunningContainer();
      container.clearIpAndHost();
      // Remove the container from the resource-monitor. When container
      // is launched again, it is added back to monitoring service.
      container.dispatcher.getEventHandler().handle(
          new ContainerStopMonitoringEvent(container.containerId, true));
      container.launchContext = container.reInitContext.newLaunchContext;

      // Re configure the Retry Context
      container.containerRetryContext =
          configureRetryContext(container.context.getConf(),
              container.launchContext, container.containerId);
      container.windowRetryContext = new SlidingWindowRetryPolicy
          .RetryContext(container.containerRetryContext);
      container.retryPolicy = new SlidingWindowRetryPolicy(clock);

      container.resourceSet =
          container.reInitContext.mergedResourceSet(container.resourceSet);
      container.isMarkeForKilling = false;
      // Ensure Resources are decremented.
      container.dispatcher.getEventHandler().handle(
          new ContainerSchedulerEvent(container,
          ContainerSchedulerEventType.CONTAINER_COMPLETED));
      container.sendScheduleEvent();
    }
  }

  /**
   * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
   * RESOURCE_FAILED event.
   */
  static class ResourceFailedTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {

      ContainerResourceFailedEvent rsrcFailedEvent =
          (ContainerResourceFailedEvent) event;
      container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage() + "\n");

      // Inform the localizer to decrement reference counts and cleanup
      // resources.
      container.cleanup();
      container.metrics.endInitingContainer();
    }
  }

  /**
   * Transition from LOCALIZING to KILLING upon receiving
   * KILL_CONTAINER event.
   */
  static class KillBeforeRunningTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Inform the localizer to decrement reference counts and cleanup
      // resources.
      container.cleanup();
      container.metrics.endInitingContainer();
      ContainerKillEvent killEvent = (ContainerKillEvent) event;
      container.exitCode = killEvent.getContainerExitStatus();
      container.addDiagnostics(killEvent.getDiagnostic() + "\n");
      container.addDiagnostics("Container is killed before being launched.\n");
    }
  }

  /**
   * Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
   * while in the process of killing.
   */
  static class LocalizedResourceDuringKillTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      container.resourceSet
          .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
    }
  }

  /**
   * Transitions upon receiving KILL_CONTAINER.
   * - SCHEDULED -> KILLING.
   * - RUNNING -> KILLING.
   * - REINITIALIZING -> KILLING.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class KillTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Kill the process/process-grp
      container.setIsReInitializing(false);
      container.setIsPaused(false);
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.CLEANUP_CONTAINER));
      ContainerKillEvent killEvent = (ContainerKillEvent) event;
      container.addDiagnostics(killEvent.getDiagnostic() + "\n");
      container.exitCode = killEvent.getContainerExitStatus();
    }
  }

  /**
   * Transitions upon receiving PAUSE_CONTAINER.
   * - LOCALIZED -> KILLING.
   * - REINITIALIZING -> KILLING.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class KillOnPauseTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Kill the process/process-grp
      container.setIsReInitializing(false);
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.CLEANUP_CONTAINER));
    }
  }

  /**
   * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
   * upon receiving CONTAINER_KILLED_ON_REQUEST.
   */
  static class ContainerKilledTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      if (container.hasDefaultExitCode()) {
        container.exitCode = exitEvent.getExitCode();
      }

      if (exitEvent.getDiagnosticInfo() != null) {
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }

      // The process/process-grp is killed. Decrement reference counts and
      // cleanup resources
      container.cleanup();
    }
  }

  /**
   * Handle the following transitions:
   * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
   *    KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
   *   -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ContainerDoneTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    @SuppressWarnings("unchecked")
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.releaseContainer(
          container.containerTokenIdentifier.getResource());
      if (container.containerMetrics != null) {
        container.containerMetrics
            .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
        container.containerMetrics.finished(false);
      }
      container.sendFinishedEvents();

      // if the current state is NEW it means the CONTAINER_INIT was never
      // sent for the event, thus no need to send the CONTAINER_STOP
      if (container.getCurrentState()
          != org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
        container.dispatcher.getEventHandler().handle(new AuxServicesEvent
            (AuxServicesEventType.CONTAINER_STOP, container));
      }
      container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
    }
  }

  /**
   * Handle the following transition:
   * - NEW -> DONE upon KILL_CONTAINER
   */
  static class KillOnNewTransition extends ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
        container.sendFinishedEvents();
      } else {
        ContainerKillEvent killEvent = (ContainerKillEvent) event;
        container.exitCode = killEvent.getContainerExitStatus();
        container.addDiagnostics(killEvent.getDiagnostic() + "\n");
        container.addDiagnostics("Container is killed before being launched.\n");
        container.metrics.killedContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        super.transition(container, event);
      }
    }
  }

  /**
   * Handle the following transition:
   * - LOCALIZATION_FAILED -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class LocalizationFailedToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.failedContainer();
      NMAuditLogger.logFailure(container.user,
          AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
          "Container failed with state: " + container.getContainerState(),
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - EXITED_WITH_SUCCESS -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ExitedWithSuccessToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      } else {
        LOG.warn("Container exited with success despite being killed and not" +
            "actually running");
      }
      container.metrics.completedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - EXITED_WITH_FAILURE -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ExitedWithFailureToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      }
      container.metrics.failedContainer();
      NMAuditLogger.logFailure(container.user,
          AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
          "Container failed with state: " + container.getContainerState(),
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - KILLING -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class KillingToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.killedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * CONTAINER_CLEANEDUP_AFTER_KILL -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ContainerCleanedupAfterKillToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      }
      container.metrics.killedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Update diagnostics, staying in the same state.
   */
  static class ContainerDiagnosticsUpdateTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerDiagnosticsUpdateEvent updateEvent =
          (ContainerDiagnosticsUpdateEvent) event;
      container.addDiagnostics(updateEvent.getDiagnosticsUpdate() + "\n");
    }
  }

  /**
   * Transitions upon receiving PAUSE_CONTAINER.
   * - RUNNING -> PAUSING
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class PauseContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Pause the process/process-grp if it is supported by the container
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.PAUSE_CONTAINER));
      ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
      container.addDiagnostics(pauseEvent.getDiagnostic() + "\n");
    }
  }

  /**
   * Transitions upon receiving PAUSED_CONTAINER.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class PausedContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(true);
      container.metrics.pausedContainer();
      // Container was PAUSED so tell the scheduler
      container.dispatcher.getEventHandler().handle(
          new ContainerSchedulerEvent(container,
              ContainerSchedulerEventType.CONTAINER_PAUSED));
    }
  }

  /**
   * Transitions upon receiving RESUME_CONTAINER.
   * - PAUSED -> RUNNING
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class ResumeContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(false);
      // Pause the process/process-grp if it is supported by the container
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.RESUME_CONTAINER));
      ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
      container.addDiagnostics(resumeEvent.getDiagnostic() + "\n");
    }
  }

  @Override
  public void handle(ContainerEvent event) {
    this.writeLock.lock();
    try {
      ContainerId containerID = event.getContainerID();
      LOG.debug("Processing {} of type {}", containerID, event.getType());
      ContainerState oldState = stateMachine.getCurrentState();
      ContainerState newState = null;
      try {
        newState =
            stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitionException e) {
        LOG.error("Can't handle this event at current state: Current: ["
            + oldState + "], eventType: [" + event.getType() + "]," +
            " container: [" + containerID + "]", e);
      }
      if (newState != null && oldState != newState) {
        LOG.info("Container " + containerID + " transitioned from "
            + oldState
            + " to " + newState);
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String toString() {
    this.readLock.lock();
    try {
      return this.containerId.toString();
    } finally {
      this.readLock.unlock();
    }
  }

  private boolean hasDefaultExitCode() {
    return (this.exitCode == ContainerExitStatus.INVALID);
  }

  /**
   * Returns whether the specific resource should be uploaded to the shared
   * cache.
   */
  private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
      LocalResourceRequest resource) {
    return container.resourceSet.getResourcesUploadPolicies().get(resource);
  }

  private void setIsPaused(boolean paused) {
    if (this.wasPaused && !paused) {
      this.metrics.endPausedContainer();
    }
    this.wasPaused = paused;
  }

  @VisibleForTesting
  ContainerRetryContext getContainerRetryContext() {
    return containerRetryContext;
  }

  @Override
  public Priority getPriority() {
    return containerTokenIdentifier.getPriority();
  }

  @Override
  public boolean isRunning() {
    return getContainerState() == ContainerState.RUNNING;
  }

  @Override
  public void setIsReInitializing(boolean isReInitializing) {
    if (this.isReInitializing && !isReInitializing) {
      metrics.endReInitingContainer();
    }
    this.isReInitializing = isReInitializing;
  }

  @Override
  public boolean isReInitializing() {
    return this.isReInitializing;
  }

  @Override
  public boolean isMarkedForKilling() {
    return this.isMarkeForKilling;
  }

  @Override
  public boolean canRollback() {
    return (this.reInitContext != null)
        && (this.reInitContext.canRollback());
  }

  @Override
  public void commitUpgrade() {
    this.reInitContext = null;
  }

  @Override
  public boolean isRecovering() {
    boolean isRecovering = (
        recoveredStatus != RecoveredContainerStatus.REQUESTED &&
        getContainerState() == ContainerState.NEW);
    return isRecovering;
  }

  /**
   * Get assigned resource mappings to the container.
   *
   * @return Resource Mappings of the container
   */
  @Override
  public ResourceMappings getResourceMappings() {
    return resourceMappings;
  }

  private void storeRetryContext() {
    if (windowRetryContext.getRestartTimes() != null &&
        !windowRetryContext.getRestartTimes().isEmpty()) {
      try {
        stateStore.storeContainerRestartTimes(containerId,
            windowRetryContext.getRestartTimes());
      } catch (IOException e) {
        LOG.warn(
            "Unable to update finishTimeForRetryAttempts in state store for "
                + containerId, e);
      }
    }
    try {
      stateStore.storeContainerRemainingRetryAttempts(containerId,
          windowRetryContext.getRemainingRetries());
    } catch (IOException e) {
      LOG.warn(
          "Unable to update remainingRetryAttempts in state store for "
              + containerId, e);
    }
  }

  @VisibleForTesting
  SlidingWindowRetryPolicy getRetryPolicy() {
    return retryPolicy;
  }

  @Override
  public boolean isContainerInFinalStates() {
    ContainerState state = getContainerState();
    return state == ContainerState.KILLING || state == ContainerState.DONE
        || state == ContainerState.LOCALIZATION_FAILED
        || state == ContainerState.CONTAINER_RESOURCES_CLEANINGUP
        || state == ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL
        || state == ContainerState.EXITED_WITH_FAILURE
        || state == ContainerState.EXITED_WITH_SUCCESS;
  }

  @Override
  public void setExposedPorts(String ports) {
    this.exposedPorts = ports;
  }

  @Override
  public List<LocalizationStatus> getLocalizationStatuses() {
    this.readLock.lock();
    try {
      return resourceSet.getLocalizationStatuses();
    } finally {
      this.readLock.unlock();
    }
  }

  public void setContainerRuntimeData(Object containerRuntimeData) {
    this.containerRuntimeData = containerRuntimeData;
  }

  public <T> T getContainerRuntimeData(Class<T> runtimeClass)
      throws ContainerExecutionException {
    if (!runtimeClass.isInstance(containerRuntimeData)) {
      throw new ContainerExecutionException(
          "Runtime class " + containerRuntimeData.getClass().getCanonicalName()
          + " is invalid. Expected class " + runtimeClass.getCanonicalName());
    }
    return runtimeClass.cast(containerRuntimeData);
  }

  @Override
  public String localizationCountersAsString() {
    StringBuilder result =
        new StringBuilder(String.valueOf(localizationCounts[0]));
    for (int i = 1; i < localizationCounts.length; i++) {
      result.append(',').append(localizationCounts[i]);
    }
    return result.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Container 源码

hadoop ContainerDiagnosticsUpdateEvent 源码

hadoop ContainerEvent 源码

hadoop ContainerEventType 源码

hadoop ContainerExitEvent 源码

hadoop ContainerInitEvent 源码

hadoop ContainerKillEvent 源码

hadoop ContainerPauseEvent 源码

hadoop ContainerReInitEvent 源码

hadoop ContainerResourceEvent 源码

0  赞