hadoop ContainerLaunch 源码

  • 2022-10-20
  • 浏览 (192)

haddop ContainerLaunch 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.TOKEN_FILE_NAME_FMT;

import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.server.security.AMSecretKeys;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContainerLaunch implements Callable<Integer> {

  private static final Logger LOG =
       LoggerFactory.getLogger(ContainerLaunch.class);

  private static final String CONTAINER_PRE_LAUNCH_PREFIX = "prelaunch";
  public static final String CONTAINER_PRE_LAUNCH_STDOUT = CONTAINER_PRE_LAUNCH_PREFIX + ".out";
  public static final String CONTAINER_PRE_LAUNCH_STDERR = CONTAINER_PRE_LAUNCH_PREFIX + ".err";

  public static final String CONTAINER_SCRIPT =
    Shell.appendScriptExtension("launch_container");

  public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
  public static final String SYSFS_DIR = "sysfs";

  public static final String KEYSTORE_FILE = "yarn_provided.keystore";
  public static final String TRUSTSTORE_FILE = "yarn_provided.truststore";

  private static final String PID_FILE_NAME_FMT = "%s.pid";
  static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";

  protected final Dispatcher dispatcher;
  protected final ContainerExecutor exec;
  protected final Application app;
  protected final Container container;
  private final Configuration conf;
  private final Context context;
  private final ContainerManagerImpl containerManager;

  protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
  protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);

  protected AtomicBoolean completed = new AtomicBoolean(false);

  private volatile boolean killedBeforeStart = false;
  private long maxKillWaitTime = 2000;

  protected Path pidFilePath = null;

  protected final LocalDirsHandlerService dirsHandler;

  private final Lock launchLock = new ReentrantLock();

  public ContainerLaunch(Context context, Configuration configuration,
      Dispatcher dispatcher, ContainerExecutor exec, Application app,
      Container container, LocalDirsHandlerService dirsHandler,
      ContainerManagerImpl containerManager) {
    this.context = context;
    this.conf = configuration;
    this.app = app;
    this.exec = exec;
    this.container = container;
    this.dispatcher = dispatcher;
    this.dirsHandler = dirsHandler;
    this.containerManager = containerManager;
    this.maxKillWaitTime =
        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
  }

  @VisibleForTesting
  public static String expandEnvironment(String var,
      Path containerLogDir) {
    var = var.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
      containerLogDir.toString());
    var = var.replace(ApplicationConstants.CLASS_PATH_SEPARATOR,
      File.pathSeparator);

    // replace parameter expansion marker. e.g. {{VAR}} on Windows is replaced
    // as %VAR% and on Linux replaced as "$VAR"
    if (Shell.WINDOWS) {
      var = var.replaceAll("(\\{\\{)|(\\}\\})", "%");
    } else {
      var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_LEFT, "$");
      var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_RIGHT, "");
    }
    return var;
  }

  private void expandAllEnvironmentVars(
      Map<String, String> environment, Path containerLogDir) {
    for (Entry<String, String> entry : environment.entrySet()) {
      String value = entry.getValue();
      value = expandEnvironment(value, containerLogDir);
      entry.setValue(value);
    }
  }

  private void addKeystoreVars(Map<String, String> environment,
      Path containerWorkDir) {
    environment.put(ApplicationConstants.KEYSTORE_FILE_LOCATION_ENV_NAME,
        new Path(containerWorkDir,
            ContainerLaunch.KEYSTORE_FILE).toUri().getPath());
    environment.put(ApplicationConstants.KEYSTORE_PASSWORD_ENV_NAME,
        new String(container.getCredentials().getSecretKey(
            AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE_PASSWORD),
            StandardCharsets.UTF_8));
  }

  private void addTruststoreVars(Map<String, String> environment,
                               Path containerWorkDir) {
    environment.put(
        ApplicationConstants.TRUSTSTORE_FILE_LOCATION_ENV_NAME,
        new Path(containerWorkDir,
            ContainerLaunch.TRUSTSTORE_FILE).toUri().getPath());
    environment.put(ApplicationConstants.TRUSTSTORE_PASSWORD_ENV_NAME,
        new String(container.getCredentials().getSecretKey(
            AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE_PASSWORD),
            StandardCharsets.UTF_8));
  }

  @Override
  public Integer call() {
    if (!validateContainerState()) {
      return 0;
    }

    final ContainerLaunchContext launchContext = container.getLaunchContext();
    ContainerId containerID = container.getContainerId();
    String containerIdStr = containerID.toString();
    final List<String> command = launchContext.getCommands();
    int ret = -1;

    Path containerLogDir;
    try {
      Map<Path, List<String>> localResources = getLocalizedResources();

      final String user = container.getUser();
      // /////////////////////////// Variable expansion
      // Before the container script gets written out.
      List<String> newCmds = new ArrayList<String>(command.size());
      String appIdStr = app.getAppId().toString();
      String relativeContainerLogDir = ContainerLaunch
          .getRelativeContainerLogDir(appIdStr, containerIdStr);
      containerLogDir =
          dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
      recordContainerLogDir(containerID, containerLogDir.toString());
      for (String str : command) {
        // TODO: Should we instead work via symlinks without this grammar?
        newCmds.add(expandEnvironment(str, containerLogDir));
      }
      launchContext.setCommands(newCmds);

      // The actual expansion of environment variables happens after calling
      // sanitizeEnv.  This allows variables specified in NM_ADMIN_USER_ENV
      // to reference user or container-defined variables.
      Map<String, String> environment = launchContext.getEnvironment();
      // /////////////////////////// End of variable expansion

      // Use this to track variables that are added to the environment by nm.
      LinkedHashSet<String> nmEnvVars = new LinkedHashSet<String>();

      FileContext lfs = FileContext.getLocalFSFileContext();

      Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + CONTAINER_SCRIPT);
      Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
      Path nmPrivateKeystorePath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + KEYSTORE_FILE);
      Path nmPrivateTruststorePath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + TRUSTSTORE_FILE);
      Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr));

      // Select the working directory for the container
      Path containerWorkDir = deriveContainerWorkDir();
      recordContainerWorkDir(containerID, containerWorkDir.toString());

      // Select a root dir for all csi volumes for the container
      Path csiVolumesRoot = deriveCsiVolumesRootDir();
      recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());

      String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
      // pid file should be in nm private dir so that it is not
      // accessible by users
      pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
      List<String> localDirs = dirsHandler.getLocalDirs();
      List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
      List<String> logDirs = dirsHandler.getLogDirs();
      List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
      List<String> userLocalDirs = getUserLocalDirs(localDirs);
      List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
      List<String> containerLogDirs = getContainerLogDirs(logDirs);
      List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
      List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
          appIdStr);

      if (!dirsHandler.areDisksHealthy()) {
        ret = ContainerExitStatus.DISKS_FAILED;
        throw new IOException("Most of the disks failed. "
            + dirsHandler.getDisksHealthReport(false));
      }
      List<Path> appDirs = new ArrayList<Path>(localDirs.size());
      for (String localDir : localDirs) {
        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
        Path userdir = new Path(usersdir, user);
        Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
        appDirs.add(new Path(appsdir, appIdStr));
      }

      byte[] keystore = container.getCredentials().getSecretKey(
          AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE);
      if (keystore != null) {
        try (DataOutputStream keystoreOutStream =
                 lfs.create(nmPrivateKeystorePath,
                     EnumSet.of(CREATE, OVERWRITE))) {
          keystoreOutStream.write(keystore);
        }
      } else {
        nmPrivateKeystorePath = null;
      }
      byte[] truststore = container.getCredentials().getSecretKey(
          AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE);
      if (truststore != null) {
        try (DataOutputStream truststoreOutStream =
                 lfs.create(nmPrivateTruststorePath,
                     EnumSet.of(CREATE, OVERWRITE))) {
          truststoreOutStream.write(truststore);
        }
      } else {
        nmPrivateTruststorePath = null;
      }

      // Set the token location too.
      addToEnvMap(environment, nmEnvVars,
          ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
          new Path(containerWorkDir,
              FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());

      // /////////// Write out the container-script in the nmPrivate space.
      try (DataOutputStream containerScriptOutStream =
               lfs.create(nmPrivateContainerScriptPath,
                   EnumSet.of(CREATE, OVERWRITE))) {
        // Sanitize the container's environment
        sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
            containerLogDirs, localResources, nmPrivateClasspathJarDir,
            nmEnvVars);

        expandAllEnvironmentVars(environment, containerLogDir);

        // Add these if needed after expanding so we don't expand key values.
        if (keystore != null) {
          addKeystoreVars(environment, containerWorkDir);
        }
        if (truststore != null) {
          addTruststoreVars(environment, containerWorkDir);
        }

        prepareContainer(localResources, containerLocalDirs);

        // Write out the environment
        exec.writeLaunchEnv(containerScriptOutStream, environment,
            localResources, launchContext.getCommands(),
            containerLogDir, user, nmEnvVars);
      }
      // /////////// End of writing out container-script

      // /////////// Write out the container-tokens in the nmPrivate space.
      try (DataOutputStream tokensOutStream =
               lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
        Credentials creds = container.getCredentials();
        creds.writeTokenStorageToStream(tokensOutStream);
      }
      // /////////// End of writing out container-tokens

      ret = launchContainer(new ContainerStartContext.Builder()
          .setContainer(container)
          .setLocalizedResources(localResources)
          .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
          .setNmPrivateTokensPath(nmPrivateTokensPath)
          .setNmPrivateKeystorePath(nmPrivateKeystorePath)
          .setNmPrivateTruststorePath(nmPrivateTruststorePath)
          .setUser(user)
          .setAppId(appIdStr)
          .setContainerWorkDir(containerWorkDir)
          .setContainerCsiVolumesRootDir(csiVolumesRoot)
          .setLocalDirs(localDirs)
          .setLogDirs(logDirs)
          .setFilecacheDirs(filecacheDirs)
          .setUserLocalDirs(userLocalDirs)
          .setContainerLocalDirs(containerLocalDirs)
          .setContainerLogDirs(containerLogDirs)
          .setUserFilecacheDirs(userFilecacheDirs)
          .setApplicationLocalDirs(applicationLocalDirs).build());
    } catch (ConfigurationException e) {
      LOG.error("Failed to launch container due to configuration error.", e);
      dispatcher.getEventHandler().handle(new ContainerExitEvent(
          containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
          e.getMessage()));
      // Mark the node as unhealthy
      context.getNodeStatusUpdater().reportException(e);
      return ret;
    } catch (Throwable e) {
      LOG.warn("Failed to launch container.", e);
      dispatcher.getEventHandler().handle(new ContainerExitEvent(
          containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
          e.getMessage()));
      return ret;
    } finally {
      setContainerCompletedStatus(ret);
    }

    handleContainerExitCode(ret, containerLogDir);
    return ret;
  }

  /**
   * Volumes mount point root:
   *   ${YARN_LOCAL_DIR}/usercache/${user}/filecache/csiVolumes/app/container
   * CSI volumes may creates the mount point with different permission bits.
   * If we create the volume mount under container work dir, it may
   * mess up the existing permission structure, which is restricted by
   * linux container executor. So we put all volume mounts under a same
   * root dir so it is easier cleanup.
   **/
  private Path deriveCsiVolumesRootDir() throws IOException {
    final String containerVolumePath =
        ContainerLocalizer.USERCACHE + Path.SEPARATOR
            + container.getUser() + Path.SEPARATOR
            + ContainerLocalizer.FILECACHE + Path.SEPARATOR
            + ContainerLocalizer.CSI_VOLIUME_MOUNTS_ROOT + Path.SEPARATOR
            + app.getAppId().toString() + Path.SEPARATOR
            + container.getContainerId().toString();
    return dirsHandler.getLocalPathForWrite(containerVolumePath,
        LocalDirAllocator.SIZE_UNKNOWN, false);
  }

  private Path deriveContainerWorkDir() throws IOException {

    final String containerWorkDirPath =
        ContainerLocalizer.USERCACHE +
        Path.SEPARATOR +
        container.getUser() +
        Path.SEPARATOR +
        ContainerLocalizer.APPCACHE +
        Path.SEPARATOR +
        app.getAppId().toString() +
        Path.SEPARATOR +
        container.getContainerId().toString();

    final Path containerWorkDir =
        dirsHandler.getLocalPathForWrite(
          containerWorkDirPath,
          LocalDirAllocator.SIZE_UNKNOWN, false);

    return containerWorkDir;
  }

  private void prepareContainer(Map<Path, List<String>> localResources,
      List<String> containerLocalDirs) throws IOException {

    exec.prepareContainer(new ContainerPrepareContext.Builder()
        .setContainer(container)
        .setLocalizedResources(localResources)
        .setUser(container.getUser())
        .setContainerLocalDirs(containerLocalDirs)
        .setCommands(container.getLaunchContext().getCommands())
        .build());
  }

  protected boolean validateContainerState() {
    // CONTAINER_KILLED_ON_REQUEST should not be missed if the container
    // is already at KILLING
    if (container.getContainerState() == ContainerState.KILLING) {
      dispatcher.getEventHandler().handle(
          new ContainerExitEvent(container.getContainerId(),
              ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
              Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
                  ExitCode.TERMINATED.getExitCode(),
              "Container terminated before launch."));
      return false;
    }

    return true;
  }

  protected List<String> getContainerLogDirs(List<String> logDirs) {
    List<String> containerLogDirs = new ArrayList<>(logDirs.size());
    String appIdStr = app.getAppId().toString();
    String containerIdStr = container.getContainerId().toString();
    String relativeContainerLogDir = ContainerLaunch
        .getRelativeContainerLogDir(appIdStr, containerIdStr);

    for (String logDir : logDirs) {
      containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir);
    }

    return containerLogDirs;
  }

  protected List<String> getContainerLocalDirs(List<String> localDirs) {
    List<String> containerLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    String appIdStr = app.getAppId().toString();
    String relativeContainerLocalDir = ContainerLocalizer.USERCACHE
        + Path.SEPARATOR + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
        + Path.SEPARATOR + appIdStr + Path.SEPARATOR;

    for (String localDir : localDirs) {
      containerLocalDirs.add(localDir + Path.SEPARATOR
          + relativeContainerLocalDir);
    }

    return containerLocalDirs;
  }

  protected List<String> getUserLocalDirs(List<String> localDirs) {
    List<String> userLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();

    for (String localDir : localDirs) {
      String userLocalDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR;

      userLocalDirs.add(userLocalDir);
    }

    return userLocalDirs;
  }

  protected List<String> getNMFilecacheDirs(List<String> localDirs) {
    List<String> filecacheDirs = new ArrayList<>(localDirs.size());

    for (String localDir : localDirs) {
      String filecacheDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.FILECACHE;

      filecacheDirs.add(filecacheDir);
    }

    return filecacheDirs;
  }

  protected List<String> getUserFilecacheDirs(List<String> localDirs) {
    List<String> userFilecacheDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    for (String localDir : localDirs) {
      String userFilecacheDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
      userFilecacheDirs.add(userFilecacheDir);
    }
    return userFilecacheDirs;
  }

  protected List<String> getApplicationLocalDirs(List<String> localDirs,
      String appIdStr) {
    List<String> applicationLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    for (String localDir : localDirs) {
      String appLocalDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR + ContainerLocalizer.APPCACHE
          + Path.SEPARATOR + appIdStr;
      applicationLocalDirs.add(appLocalDir);
    }
    return applicationLocalDirs;
  }

  protected Map<Path, List<String>> getLocalizedResources()
      throws YarnException {
    Map<Path, List<String>> localResources = container.getLocalizedResources();
    if (localResources == null) {
      throw RPCUtil.getRemoteException(
          "Unable to get local resources when Container " + container
              + " is at " + container.getContainerState());
    }
    return localResources;
  }

  protected int launchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    int launchPrep = prepareForLaunch(ctx);
    if (launchPrep == 0) {
      launchLock.lock();
      try {
        return exec.launchContainer(ctx);
      } finally {
        launchLock.unlock();
      }
    }
    return launchPrep;
  }

  protected int relaunchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    int launchPrep = prepareForLaunch(ctx);
    if (launchPrep == 0) {
      launchLock.lock();
      try {
        return exec.relaunchContainer(ctx);
      } finally {
        launchLock.unlock();
      }
    }
    return launchPrep;
  }

  void reapContainer() throws IOException {
    launchLock.lock();
    try {
      // Reap the container
      boolean result = exec.reapContainer(
          new ContainerReapContext.Builder()
              .setContainer(container)
              .setUser(container.getUser())
              .build());
      if (!result) {
        throw new IOException("Reap container failed for container " +
            container.getContainerId());
      }
      cleanupContainerFiles(getContainerWorkDir());
    } finally {
      launchLock.unlock();
    }
  }

  protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
    ContainerId containerId = container.getContainerId();
    if (container.isMarkedForKilling()) {
      LOG.info("Container " + containerId + " not launched as it has already "
          + "been marked for Killing");
      this.killedBeforeStart = true;
      return ExitCode.TERMINATED.getExitCode();
    }
    // LaunchContainer is a blocking call. We are here almost means the
    // container is launched, so send out the event.
    dispatcher.getEventHandler().handle(new ContainerEvent(
        containerId,
        ContainerEventType.CONTAINER_LAUNCHED));
    context.getNMStateStore().storeContainerLaunched(containerId);

    // Check if the container is signalled to be killed.
    if (!containerAlreadyLaunched.compareAndSet(false, true)) {
      LOG.info("Container " + containerId + " not launched as "
          + "cleanup already called");
      return ExitCode.TERMINATED.getExitCode();
    } else {
      exec.activateContainer(containerId, pidFilePath);
    }
    return ExitCode.SUCCESS.getExitCode();
  }

  protected void setContainerCompletedStatus(int exitCode) {
    ContainerId containerId = container.getContainerId();
    completed.set(true);
    exec.deactivateContainer(containerId);
    try {
      if (!container.shouldRetry(exitCode)) {
        context.getNMStateStore().storeContainerCompleted(containerId,
            exitCode);
      }
    } catch (IOException e) {
      LOG.error("Unable to set exit code for container " + containerId);
    }
  }

  protected void handleContainerExitCode(int exitCode, Path containerLogDir) {
    ContainerId containerId = container.getContainerId();
    LOG.debug("Container {} completed with exit code {}", containerId,
        exitCode);

    StringBuilder diagnosticInfo =
        new StringBuilder("Container exited with a non-zero exit code ");
    diagnosticInfo.append(exitCode);
    diagnosticInfo.append(". ");
    if (exitCode == ExitCode.FORCE_KILLED.getExitCode()
        || exitCode == ExitCode.TERMINATED.getExitCode()) {
      // If the process was killed, Send container_cleanedup_after_kill and
      // just break out of this method.
      dispatcher.getEventHandler().handle(
          new ContainerExitEvent(containerId,
              ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
              diagnosticInfo.toString()));
    } else if (exitCode != 0) {
      handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
          diagnosticInfo);
    } else {
      LOG.info("Container " + containerId + " succeeded ");
      dispatcher.getEventHandler().handle(
          new ContainerEvent(containerId,
              ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
    }
  }

  /**
   * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log.
   * ErrorLog filename is not fixed and depends upon app, hence file name
   * pattern is used.
   *
   * @param containerID
   * @param ret
   * @param containerLogDir
   * @param diagnosticInfo
   */
  protected void handleContainerExitWithFailure(ContainerId containerID,
      int ret, Path containerLogDir, StringBuilder diagnosticInfo) {
    LOG.warn("Container launch failed : " + diagnosticInfo.toString());

    FileSystem fileSystem = null;
    long tailSizeInBytes =
        conf.getLong(YarnConfiguration.NM_CONTAINER_STDERR_BYTES,
            YarnConfiguration.DEFAULT_NM_CONTAINER_STDERR_BYTES);

    // Append container prelaunch stderr to diagnostics
    try {
      fileSystem = FileSystem.getLocal(conf).getRaw();
      FileStatus preLaunchErrorFileStatus = fileSystem
          .getFileStatus(new Path(containerLogDir, ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR));

      Path errorFile = preLaunchErrorFileStatus.getPath();
      long fileSize = preLaunchErrorFileStatus.getLen();

      diagnosticInfo.append("Error file: ")
          .append(ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR).append(".\n");
      ;

      byte[] tailBuffer = tailFile(errorFile, fileSize, tailSizeInBytes);
      diagnosticInfo.append("Last ").append(tailSizeInBytes)
          .append(" bytes of ").append(errorFile.getName()).append(" :\n")
          .append(new String(tailBuffer, StandardCharsets.UTF_8));
    } catch (IOException e) {
      LOG.error("Failed to get tail of the container's prelaunch error log file", e);
    }

    // Append container stderr to diagnostics
    String errorFileNamePattern =
        conf.get(YarnConfiguration.NM_CONTAINER_STDERR_PATTERN,
            YarnConfiguration.DEFAULT_NM_CONTAINER_STDERR_PATTERN);

    try {
      if (fileSystem == null) {
        fileSystem = FileSystem.getLocal(conf).getRaw();
      }
      FileStatus[] errorFileStatuses = fileSystem
          .globStatus(new Path(containerLogDir, errorFileNamePattern));
      if (errorFileStatuses != null && errorFileStatuses.length != 0) {
        Path errorFile = errorFileStatuses[0].getPath();
        long fileSize = errorFileStatuses[0].getLen();

        // if more than one file matches the stderr pattern, take the latest
        // modified file, and also append the file names in the diagnosticInfo
        if (errorFileStatuses.length > 1) {
          String[] errorFileNames = new String[errorFileStatuses.length];
          long latestModifiedTime = errorFileStatuses[0].getModificationTime();
          errorFileNames[0] = errorFileStatuses[0].getPath().getName();
          for (int i = 1; i < errorFileStatuses.length; i++) {
            errorFileNames[i] = errorFileStatuses[i].getPath().getName();
            if (errorFileStatuses[i]
                .getModificationTime() > latestModifiedTime) {
              latestModifiedTime = errorFileStatuses[i].getModificationTime();
              errorFile = errorFileStatuses[i].getPath();
              fileSize = errorFileStatuses[i].getLen();
            }
          }
          diagnosticInfo.append("Error files: ")
              .append(StringUtils.join(", ", errorFileNames)).append(".\n");
        }

        byte[] tailBuffer = tailFile(errorFile, fileSize, tailSizeInBytes);
        String tailBufferMsg = new String(tailBuffer, StandardCharsets.UTF_8);
        diagnosticInfo.append("Last ").append(tailSizeInBytes)
            .append(" bytes of ").append(errorFile.getName()).append(" :\n")
            .append(tailBufferMsg).append("\n")
            .append(analysesErrorMsgOfContainerExitWithFailure(tailBufferMsg));

      }
    } catch (IOException e) {
      LOG.error("Failed to get tail of the container's error log file", e);
    }
    this.dispatcher.getEventHandler()
        .handle(new ContainerExitEvent(containerID,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
            diagnosticInfo.toString()));
  }

  private byte[] tailFile(Path filePath, long fileSize, long tailSizeInBytes) throws IOException {
    FSDataInputStream errorFileIS = null;
    FileSystem fileSystem = FileSystem.getLocal(conf).getRaw();
    try {
      long startPosition =
          (fileSize < tailSizeInBytes) ? 0 : fileSize - tailSizeInBytes;
      int bufferSize =
          (int) ((fileSize < tailSizeInBytes) ? fileSize : tailSizeInBytes);
      byte[] tailBuffer = new byte[bufferSize];
      errorFileIS = fileSystem.open(filePath);
      errorFileIS.readFully(startPosition, tailBuffer);
      return tailBuffer;
    } finally {
      IOUtils.cleanupWithLogger(LOG, errorFileIS);
    }
  }

  private String analysesErrorMsgOfContainerExitWithFailure(String errorMsg) {
    StringBuilder analysis = new StringBuilder();
    if (errorMsg.indexOf("Error: Could not find or load main class"
        + " org.apache.hadoop.mapreduce") != -1) {
      analysis.append(
          "Please check whether your <HADOOP_HOME>/etc/hadoop/mapred-site.xml "
          + "contains the below configuration:\n");
      analysis.append("<property>\n")
          .append("  <name>yarn.app.mapreduce.am.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n<property>\n")
          .append("  <name>mapreduce.map.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n<property>\n")
          .append("  <name>mapreduce.reduce.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n");
    }
    return analysis.toString();
  }

  protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
    return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
        + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
  }

  /**
   * Send a signal to the container.
   *
   *
   * @throws IOException
   */
  public void signalContainer(SignalContainerCommand command)
      throws IOException {
    ContainerId containerId =
        container.getContainerTokenIdentifier().getContainerID();
    String containerIdStr = containerId.toString();
    String user = container.getUser();
    Signal signal = translateCommandToSignal(command);
    if (signal.equals(Signal.NULL)) {
      LOG.info("ignore signal command " + command);
      return;
    }

    LOG.info("Sending signal " + command + " to container " + containerIdStr);

    boolean alreadyLaunched =
        !containerAlreadyLaunched.compareAndSet(false, true);
    if (!alreadyLaunched) {
      LOG.info("Container " + containerIdStr + " not launched."
          + " Not sending the signal");
      return;
    }

    LOG.debug("Getting pid for container {} to send signal to from pid"
        + " file {}", containerIdStr,
        (pidFilePath != null ? pidFilePath.toString() : "null"));

    try {
      // get process id from pid file if available
      // else if shell is still active, get it from the shell
      String processId = getContainerPid();
      if (processId != null) {
        LOG.debug("Sending signal to pid {} as user {} for container {}",
            processId, user, containerIdStr);

        boolean result = exec.signalContainer(
            new ContainerSignalContext.Builder()
                .setContainer(container)
                .setUser(user)
                .setPid(processId)
                .setSignal(signal)
                .build());

        String diagnostics = "Sent signal " + command
            + " (" + signal + ") to pid " + processId
            + " as user " + user
            + " for container " + containerIdStr
            + ", result=" + (result ? "success" : "failed");
        LOG.info(diagnostics);

        dispatcher.getEventHandler().handle(
            new ContainerDiagnosticsUpdateEvent(containerId, diagnostics));
      }
    } catch (Exception e) {
      String message =
          "Exception when sending signal to container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.warn(message);
    }
  }

  @VisibleForTesting
  public static Signal translateCommandToSignal(
      SignalContainerCommand command) {
    Signal signal = Signal.NULL;
    switch (command) {
      case OUTPUT_THREAD_DUMP:
        // TODO for windows support.
        signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT;
        break;
      case GRACEFUL_SHUTDOWN:
        signal = Signal.TERM;
        break;
      case FORCEFUL_SHUTDOWN:
        signal = Signal.KILL;
        break;
    }
    return signal;
  }

  /**
   * Pause the container.
   * Cancels the launch if the container isn't launched yet. Otherwise asks the
   * executor to pause the container.
   * @throws IOException in case of errors.
   */
  public void pauseContainer() throws IOException {
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    LOG.info("Pausing the container " + containerIdStr);

    // The pause event is only handled if the container is in the running state
    // (the container state machine), so we don't check for
    // shouldLaunchContainer over here

    if (!shouldPauseContainer.compareAndSet(false, true)) {
      LOG.info("Container " + containerId + " not paused as "
          + "resume already called");
      return;
    }

    try {
      // Pause the container
      exec.pauseContainer(container);

      // PauseContainer is a blocking call. We are here almost means the
      // container is paused, so send out the event.
      dispatcher.getEventHandler().handle(new ContainerEvent(
          containerId,
          ContainerEventType.CONTAINER_PAUSED));

      try {
        this.context.getNMStateStore().storeContainerPaused(
            container.getContainerId());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.getContainerId()
            + "] state. The Container has been paused.", e);
      }
    } catch (Exception e) {
      String message =
          "Exception when trying to pause container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.info(message);
      container.handle(new ContainerKillEvent(container.getContainerId(),
          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
          + " an exception in pausing it."));
    }
  }

  /**
   * Resume the container.
   * Cancels the launch if the container isn't launched yet. Otherwise asks the
   * executor to pause the container.
   * @throws IOException in case of error.
   */
  public void resumeContainer() throws IOException {
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    LOG.info("Resuming the container " + containerIdStr);

    // The resume event is only handled if the container is in a paused state
    // so we don't check for the launched flag here.

    // paused flag will be set to true if process already paused
    boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
    if (!alreadyPaused) {
      LOG.info("Container " + containerIdStr + " not paused."
          + " No resume necessary");
      return;
    }

    // If the container has already started
    try {
      exec.resumeContainer(container);
      // ResumeContainer is a blocking call. We are here almost means the
      // container is resumed, so send out the event.
      dispatcher.getEventHandler().handle(new ContainerEvent(
          containerId,
          ContainerEventType.CONTAINER_RESUMED));

      try {
        this.context.getNMStateStore().removeContainerPaused(
            container.getContainerId());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.getContainerId()
            + "] state. The Container has been resumed.", e);
      }
    } catch (Exception e) {
      String message =
          "Exception when trying to resume container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.info(message);
      container.handle(new ContainerKillEvent(container.getContainerId(),
          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
          + " an exception in pausing it."));
    }
  }

  /**
   * Loop through for a time-bounded interval waiting to
   * read the process id from a file generated by a running process.
   * @return Process ID; null when pidFilePath is null
   * @throws Exception
   */
  String getContainerPid() throws Exception {
    if (pidFilePath == null) {
      return null;
    }
    String containerIdStr = 
        container.getContainerId().toString();
    String processId;
    LOG.debug("Accessing pid for container {} from pid file {}",
        containerIdStr, pidFilePath);
    int sleepCounter = 0;
    final int sleepInterval = 100;

    // loop waiting for pid file to show up 
    // until our timer expires in which case we admit defeat
    while (true) {
      processId = ProcessIdFileReader.getProcessId(pidFilePath);
      if (processId != null) {
        LOG.debug("Got pid {} for container {}", processId, containerIdStr);
        break;
      }
      else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
        LOG.info("Could not get pid for " + containerIdStr
        		+ ". Waited for " + maxKillWaitTime + " ms.");
        break;
      }
      else {
        ++sleepCounter;
        Thread.sleep(sleepInterval);
      }
    }
    return processId;
  }

  public static String getRelativeContainerLogDir(String appIdStr,
      String containerIdStr) {
    return appIdStr + Path.SEPARATOR + containerIdStr;
  }

  protected String getContainerPrivateDir(String appIdStr,
      String containerIdStr) {
    return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
        + Path.SEPARATOR;
  }

  private String getAppPrivateDir(String appIdStr) {
    return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
        + appIdStr;
  }

  Context getContext() {
    return context;
  }

  public static abstract class ShellScriptBuilder {
    public static ShellScriptBuilder create() {
      return create(Shell.osType);
    }

    @VisibleForTesting
    public static ShellScriptBuilder create(Shell.OSType osType) {
      return (osType == Shell.OSType.OS_TYPE_WIN) ?
          new WindowsShellScriptBuilder() :
          new UnixShellScriptBuilder();
    }

    private static final String LINE_SEPARATOR =
        System.getProperty("line.separator");
    private final StringBuilder sb = new StringBuilder();

    public abstract void command(List<String> command) throws IOException;

    protected static final String ENV_PRELAUNCH_STDOUT = "PRELAUNCH_OUT";
    protected static final String ENV_PRELAUNCH_STDERR = "PRELAUNCH_ERR";

    private boolean redirectStdOut = false;
    private boolean redirectStdErr = false;

    /**
     * Set stdout for the shell script
     * @param stdoutDir stdout must be an absolute path
     * @param stdOutFile stdout file name
     * @throws IOException thrown when stdout path is not absolute
     */
    public final void stdout(Path stdoutDir, String stdOutFile) throws IOException {
      if (!stdoutDir.isAbsolute()) {
        throw new IOException("Stdout path must be absolute");
      }
      redirectStdOut = true;
      setStdOut(new Path(stdoutDir, stdOutFile));
    }

    /**
     * Set stderr for the shell script
     * @param stderrDir stderr must be an absolute path
     * @param stdErrFile stderr file name
     * @throws IOException thrown when stderr path is not absolute
     */
    public final void stderr(Path stderrDir, String stdErrFile) throws IOException {
      if (!stderrDir.isAbsolute()) {
        throw new IOException("Stdout path must be absolute");
      }
      redirectStdErr = true;
      setStdErr(new Path(stderrDir, stdErrFile));
    }

    protected abstract void setStdOut(Path stdout) throws IOException;

    protected abstract void setStdErr(Path stdout) throws IOException;

    public abstract void env(String key, String value) throws IOException;

    public abstract void whitelistedEnv(String key, String value)
        throws IOException;

    public abstract void echo(String echoStr) throws IOException;

    public final void symlink(Path src, Path dst) throws IOException {
      if (!src.isAbsolute()) {
        throw new IOException("Source must be absolute");
      }
      if (dst.isAbsolute()) {
        throw new IOException("Destination must be relative");
      }
      if (dst.toUri().getPath().indexOf('/') != -1) {
        mkdir(dst.getParent());
      }
      link(src, dst);
    }

    /**
     * Method to copy files that are useful for debugging container failures.
     * This method will be called by ContainerExecutor when setting up the
     * container launch script. The method should take care to make sure files
     * are read-able by the yarn user if the files are to undergo
     * log-aggregation.
     * @param src path to the source file
     * @param dst path to the destination file - should be absolute
     * @throws IOException
     */
    public abstract void copyDebugInformation(Path src, Path dst)
        throws IOException;

    /**
     * Method to dump debug information to a target file. This method will
     * be called by ContainerExecutor when setting up the container launch
     * script.
     * @param output the file to which debug information is to be written
     * @throws IOException
     */
    public abstract void listDebugInformation(Path output) throws IOException;

    @Override
    public String toString() {
      return sb.toString();
    }

    public final void write(PrintStream out) throws IOException {
      out.append(sb);
    }

    protected final void buildCommand(String... command) {
      for (String s : command) {
        sb.append(s);
      }
    }

    protected final void linebreak(String... command) {
      sb.append(LINE_SEPARATOR);
    }

    protected final void line(String... command) {
      buildCommand(command);
      linebreak();
    }

    public void setExitOnFailure() {
      // Dummy implementation
    }

    protected abstract void link(Path src, Path dst) throws IOException;

    protected abstract void mkdir(Path path) throws IOException;

    boolean doRedirectStdOut() {
      return redirectStdOut;
    }

    boolean doRedirectStdErr() {
      return redirectStdErr;
    }

    /**
     * Parse an environment value and returns all environment keys it uses.
     * @param envVal an environment variable's value
     * @return all environment variable names used in <code>envVal</code>.
     */
    public Set<String> getEnvDependencies(final String envVal) {
      return Collections.emptySet();
    }

    /**
     * Returns a dependency ordered version of <code>envs</code>. Does not alter
     * input <code>envs</code> map.
     * @param envs environment map
     * @return a dependency ordered version of <code>envs</code>
     */
    public final Map<String, String> orderEnvByDependencies(
        Map<String, String> envs) {
      if (envs == null || envs.size() < 2) {
        return envs;
      }
      final Map<String, String> ordered = new LinkedHashMap<String, String>();
      class Env {
        private boolean resolved = false;
        private final Collection<Env> deps = new ArrayList<>();
        private final String name;
        private final String value;
        Env(String name, String value) {
          this.name = name;
          this.value = value;
        }
        void resolve() {
          resolved = true;
          for (Env dep : deps) {
            if (!dep.resolved) {
              dep.resolve();
            }
          }
          ordered.put(name, value);
        }
      }
      final Map<String, Env> singletons = new HashMap<>();
      for (Map.Entry<String, String> e : envs.entrySet()) {
        Env env = singletons.get(e.getKey());
        if (env == null) {
          env = new Env(e.getKey(), e.getValue());
          singletons.put(env.name, env);
        }
        for (String depStr : getEnvDependencies(env.value)) {
          if (!envs.containsKey(depStr)) {
            continue;
          }
          Env depEnv = singletons.get(depStr);
          if (depEnv == null) {
            depEnv = new Env(depStr, envs.get(depStr));
            singletons.put(depStr, depEnv);
          }
          env.deps.add(depEnv);
        }
      }
      for (Env env : singletons.values()) {
        if (!env.resolved) {
          env.resolve();
        }
      }
      return ordered;
    }
  }

  private static final class UnixShellScriptBuilder extends ShellScriptBuilder {
    @SuppressWarnings("unused")
    private void errorCheck() {
      line("hadoop_shell_errorcode=$?");
      line("if [[ \"$hadoop_shell_errorcode\" -ne 0 ]]");
      line("then");
      line("  exit $hadoop_shell_errorcode");
      line("fi");
    }

    public UnixShellScriptBuilder() {
      line("#!/bin/bash");
      line();
    }

    @Override
    public void command(List<String> command) {
      line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\"");
    }

    @Override
    public void setStdOut(final Path stdout) throws IOException {
      line("export ", ENV_PRELAUNCH_STDOUT, "=\"", stdout.toString(), "\"");
      // tee is needed for DefaultContainerExecutor error propagation to stdout
      // Close stdout of subprocess to prevent it from writing to the stdout file
      line("exec >\"${" + ENV_PRELAUNCH_STDOUT + "}\"");
    }

    @Override
    public void setStdErr(final Path stderr) throws IOException {
      line("export ", ENV_PRELAUNCH_STDERR, "=\"", stderr.toString(), "\"");
      // tee is needed for DefaultContainerExecutor error propagation to stderr
      // Close stdout of subprocess to prevent it from writing to the stdout file
      line("exec 2>\"${" + ENV_PRELAUNCH_STDERR + "}\"");
    }

    @Override
    public void env(String key, String value) throws IOException {
      line("export ", key, "=\"", value, "\"");
    }

    @Override
    public void whitelistedEnv(String key, String value) throws IOException {
      line("export ", key, "=${", key, ":-", "\"", value, "\"}");
    }

    @Override
    public void echo(final String echoStr) throws IOException {
      line("echo \"" + echoStr + "\"");
    }

    @Override
    protected void link(Path src, Path dst) throws IOException {
      line("ln -sf -- \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
    }

    @Override
    protected void mkdir(Path path) throws IOException {
      line("mkdir -p ", path.toString());
    }

    @Override
    public void copyDebugInformation(Path src, Path dest) throws IOException {
      line("# Creating copy of launch script");
      line("cp \"", src.toUri().getPath(), "\" \"", dest.toUri().getPath(),
          "\"");
      // set permissions to 640 because we need to be able to run
      // log aggregation in secure mode as well
      if(dest.isAbsolute()) {
        line("chmod 640 \"", dest.toUri().getPath(), "\"");
      }
    }

    @Override
    public void listDebugInformation(Path output) throws  IOException {
      line("# Determining directory contents");
      line("echo \"ls -l:\" 1>\"", output.toString(), "\"");
      line("ls -l 1>>\"", output.toString(), "\"");

      // don't run error check because if there are loops
      // find will exit with an error causing container launch to fail
      // find will follow symlinks outside the work dir if such sylimks exist
      // (like public/app local resources)
      line("echo \"find -L . -maxdepth 5 -ls:\" 1>>\"", output.toString(),
          "\"");
      line("find -L . -maxdepth 5 -ls 1>>\"", output.toString(), "\"");
      line("echo \"broken symlinks(find -L . -maxdepth 5 -type l -ls):\" 1>>\"",
          output.toString(), "\"");
      line("find -L . -maxdepth 5 -type l -ls 1>>\"", output.toString(), "\"");
    }

    @Override
    public void setExitOnFailure() {
      line("set -o pipefail -e");
    }

    /**
     * Parse <code>envVal</code> using bash-like syntax to extract env variables
     * it depends on.
     */
    @Override
    public Set<String> getEnvDependencies(final String envVal) {
      if (envVal == null || envVal.isEmpty()) {
        return Collections.emptySet();
      }
      final Set<String> deps = new HashSet<>();
      // env/whitelistedEnv dump values inside double quotes
      boolean inDoubleQuotes = true;
      char c;
      int i = 0;
      final int len = envVal.length();
      while (i < len) {
        c = envVal.charAt(i);
        if (c == '"') {
          inDoubleQuotes = !inDoubleQuotes;
        } else if (c == '\'' && !inDoubleQuotes) {
          i++;
          // eat until closing simple quote
          while (i < len) {
            c = envVal.charAt(i);
            if (c == '\\') {
              i++;
            }
            if (c == '\'') {
              break;
            }
            i++;
          }
        } else if (c == '\\') {
          i++;
        } else if (c == '$') {
          i++;
          if (i >= len) {
            break;
          }
          c = envVal.charAt(i);
          if (c == '{') { // for ${... bash like syntax
            i++;
            if (i >= len) {
              break;
            }
            c = envVal.charAt(i);
            if (c == '#') { // for ${#... bash array syntax
              i++;
              if (i >= len) {
                break;
              }
            }
          }
          final int start = i;
          while (i < len) {
            c = envVal.charAt(i);
            if (c != '$' && (
                (i == start && Character.isJavaIdentifierStart(c)) ||
                    (i > start && Character.isJavaIdentifierPart(c)))) {
              i++;
            } else {
              break;
            }
          }
          if (i > start) {
            deps.add(envVal.substring(start, i));
          }
        }
        i++;
      }
      return deps;
    }
  }

  private static final class WindowsShellScriptBuilder
      extends ShellScriptBuilder {

    private static final Pattern VARIABLE_PATTERN = Pattern.compile("%(.*?)%");
    private static final Pattern SPLIT_PATTERN = Pattern.compile(":");

    private void errorCheck() {
      line("@if %errorlevel% neq 0 exit /b %errorlevel%");
    }

    private void lineWithLenCheck(String... commands) throws IOException {
      Shell.checkWindowsCommandLineLength(commands);
      line(commands);
    }

    public WindowsShellScriptBuilder() {
      line("@setlocal");
      line();
    }

    @Override
    public void command(List<String> command) throws IOException {
      lineWithLenCheck("@call ", StringUtils.join(" ", command));
      errorCheck();
    }

    //Dummy implementation
    @Override
    protected void setStdOut(final Path stdout) throws IOException {
    }

    //Dummy implementation
    @Override
    protected void setStdErr(final Path stderr) throws IOException {
    }

    @Override
    public void env(String key, String value) throws IOException {
      lineWithLenCheck("@set ", key, "=", value);
      errorCheck();
    }

    @Override
    public void whitelistedEnv(String key, String value) throws IOException {
      env(key, value);
    }

    @Override
    public void echo(final String echoStr) throws IOException {
      lineWithLenCheck("@echo \"", echoStr, "\"");
    }

    @Override
    protected void link(Path src, Path dst) throws IOException {
      File srcFile = new File(src.toUri().getPath());
      String srcFileStr = srcFile.getPath();
      String dstFileStr = new File(dst.toString()).getPath();
      lineWithLenCheck(String.format("@%s symlink \"%s\" \"%s\"",
          Shell.getWinUtilsPath(), dstFileStr, srcFileStr));
      errorCheck();
    }

    @Override
    protected void mkdir(Path path) throws IOException {
      lineWithLenCheck(String.format("@if not exist \"%s\" mkdir \"%s\"",
          path.toString(), path.toString()));
      errorCheck();
    }

    @Override
    public void copyDebugInformation(Path src, Path dest)
        throws IOException {
      // no need to worry about permissions - in secure mode
      // WindowsSecureContainerExecutor will set permissions
      // to allow NM to read the file
      line("rem Creating copy of launch script");
      lineWithLenCheck(String.format("copy \"%s\" \"%s\"", src.toString(),
          dest.toString()));
    }

    @Override
    public void listDebugInformation(Path output) throws IOException {
      line("rem Determining directory contents");
      lineWithLenCheck(
          String.format("@echo \"dir:\" > \"%s\"", output.toString()));
      lineWithLenCheck(String.format("dir >> \"%s\"", output.toString()));
    }

    /**
     * Parse <code>envVal</code> using cmd/bat-like syntax to extract env
     * variables it depends on.
     */
    public Set<String> getEnvDependencies(final String envVal) {
      if (envVal == null || envVal.isEmpty()) {
        return Collections.emptySet();
      }

      // Example inputs: %var%, %%, %a:b%
      Matcher matcher = VARIABLE_PATTERN.matcher(envVal);
      final Set<String> deps = new HashSet<>();
      while (matcher.find()) {
        String match = matcher.group(1);
        if (!match.isEmpty()) {
          if (match.equals(":")) {
            // Special case, variable name can be a single : character
            deps.add(match);
          } else {
            // Either store the variable name before the : string manipulation
            // character or the whole match. (%var% -> var, %a:b% -> a)
            String[] split = SPLIT_PATTERN.split(match, 2);
            if (!split[0].isEmpty()) {
              deps.add(split[0]);
            }
          }
        }
      }
      return deps;
    }
  }

  private static void addToEnvMap(
      Map<String, String> envMap, Set<String> envSet,
      String envName, String envValue) {
    envMap.put(envName, envValue);
    envSet.add(envName);
  }

  public void sanitizeEnv(Map<String, String> environment, Path pwd,
      List<Path> appDirs, List<String> userLocalDirs, List<String>
      containerLogDirs, Map<Path, List<String>> resources,
      Path nmPrivateClasspathJarDir,
      Set<String> nmVars) throws IOException {
    // Based on discussion in YARN-7654, for ENTRY_POINT enabled
    // docker container, we forward user defined environment variables
    // without node manager environment variables.  This is the reason
    // that we skip sanitizeEnv method.
    boolean overrideDisable = Boolean.parseBoolean(
        environment.get(
            Environment.
                YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
                    name()));
    if (overrideDisable) {
      environment.remove("WORK_DIR");
      return;
    }

    /**
     * Non-modifiable environment variables
     */

    addToEnvMap(environment, nmVars, Environment.CONTAINER_ID.name(),
        container.getContainerId().toString());

    addToEnvMap(environment, nmVars, Environment.NM_PORT.name(),
      String.valueOf(this.context.getNodeId().getPort()));

    addToEnvMap(environment, nmVars, Environment.NM_HOST.name(),
        this.context.getNodeId().getHost());

    addToEnvMap(environment, nmVars, Environment.NM_HTTP_PORT.name(),
      String.valueOf(this.context.getHttpPort()));

    addToEnvMap(environment, nmVars, Environment.LOCAL_DIRS.name(),
        StringUtils.join(",", appDirs));

    addToEnvMap(environment, nmVars, Environment.LOCAL_USER_DIRS.name(),
        StringUtils.join(",", userLocalDirs));

    addToEnvMap(environment, nmVars, Environment.LOG_DIRS.name(),
      StringUtils.join(",", containerLogDirs));

    addToEnvMap(environment, nmVars, Environment.USER.name(),
        container.getUser());

    addToEnvMap(environment, nmVars, Environment.LOGNAME.name(),
        container.getUser());

    addToEnvMap(environment, nmVars, Environment.HOME.name(),
        conf.get(
            YarnConfiguration.NM_USER_HOME_DIR, 
            YarnConfiguration.DEFAULT_NM_USER_HOME_DIR
            )
        );

    addToEnvMap(environment, nmVars, Environment.PWD.name(), pwd.toString());

    addToEnvMap(environment, nmVars, Environment.LOCALIZATION_COUNTERS.name(),
        container.localizationCountersAsString());

    if (!Shell.WINDOWS) {
      addToEnvMap(environment, nmVars, "JVM_PID", "$$");
    }

    // variables here will be forced in, even if the container has
    // specified them.  Note: we do not track these in nmVars, to
    // allow them to be ordered properly if they reference variables
    // defined by the user.
    String defEnvStr = conf.get(YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV);
    Apps.setEnvFromInputProperty(environment,
        YarnConfiguration.NM_ADMIN_USER_ENV, defEnvStr, conf,
        File.pathSeparator);

    if (!Shell.WINDOWS) {
      // maybe force path components
      String forcePath = conf.get(YarnConfiguration.NM_ADMIN_FORCE_PATH,
          YarnConfiguration.DEFAULT_NM_ADMIN_FORCE_PATH);
      if (!forcePath.isEmpty()) {
        String userPath = environment.get(Environment.PATH.name());
        environment.remove(Environment.PATH.name());
        if (userPath == null || userPath.isEmpty()) {
          Apps.addToEnvironment(environment, Environment.PATH.name(),
              forcePath, File.pathSeparator);
          Apps.addToEnvironment(environment, Environment.PATH.name(),
              "$PATH", File.pathSeparator);
        } else {
          Apps.addToEnvironment(environment, Environment.PATH.name(),
              forcePath, File.pathSeparator);
          Apps.addToEnvironment(environment, Environment.PATH.name(),
              userPath, File.pathSeparator);
        }
      }
    }

    // TODO: Remove Windows check and use this approach on all platforms after
    // additional testing.  See YARN-358.
    if (Shell.WINDOWS) {

      sanitizeWindowsEnv(environment, pwd,
          resources, nmPrivateClasspathJarDir);
    }
    // put AuxiliaryService data to environment
    for (Map.Entry<String, ByteBuffer> meta : containerManager
        .getAuxServiceMetaData().entrySet()) {
      AuxiliaryServiceHelper.setServiceDataIntoEnv(
          meta.getKey(), meta.getValue(), environment);
      nmVars.add(AuxiliaryServiceHelper.getPrefixServiceName(meta.getKey()));
    }
  }

  private void sanitizeWindowsEnv(Map<String, String> environment, Path pwd,
      Map<Path, List<String>> resources, Path nmPrivateClasspathJarDir)
      throws IOException {

    String inputClassPath = environment.get(Environment.CLASSPATH.name());

    if (inputClassPath != null && !inputClassPath.isEmpty()) {

      //On non-windows, localized resources
      //from distcache are available via the classpath as they were placed
      //there but on windows they are not available when the classpath
      //jar is created and so they "are lost" and have to be explicitly
      //added to the classpath instead.  This also means that their position
      //is lost relative to other non-distcache classpath entries which will
      //break things like mapreduce.job.user.classpath.first.  An environment
      //variable can be set to indicate that distcache entries should come
      //first

      boolean preferLocalizedJars = Boolean.parseBoolean(
              environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
      );

      boolean needsSeparator = false;
      StringBuilder newClassPath = new StringBuilder();
      if (!preferLocalizedJars) {
        newClassPath.append(inputClassPath);
        needsSeparator = true;
      }

      // Localized resources do not exist at the desired paths yet, because the
      // container launch script has not run to create symlinks yet.  This
      // means that FileUtil.createJarWithClassPath can't automatically expand
      // wildcards to separate classpath entries for each file in the manifest.
      // To resolve this, append classpath entries explicitly for each
      // resource.
      for (Map.Entry<Path, List<String>> entry : resources.entrySet()) {
        boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
                .isDirectory();

        for (String linkName : entry.getValue()) {
          // Append resource.
          if (needsSeparator) {
            newClassPath.append(File.pathSeparator);
          } else {
            needsSeparator = true;
          }
          newClassPath.append(pwd.toString())
                  .append(Path.SEPARATOR).append(linkName);

          // FileUtil.createJarWithClassPath must use File.toURI to convert
          // each file to a URI to write into the manifest's classpath.  For
          // directories, the classpath must have a trailing '/', but
          // File.toURI only appends the trailing '/' if it is a directory that
          // already exists.  To resolve this, add the classpath entries with
          // explicit trailing '/' here for any localized resource that targets
          // a directory.  Then, FileUtil.createJarWithClassPath will guarantee
          // that the resulting entry in the manifest's classpath will have a
          // trailing '/', and thus refer to a directory instead of a file.
          if (targetIsDirectory) {
            newClassPath.append(Path.SEPARATOR);
          }
        }
      }
      if (preferLocalizedJars) {
        if (needsSeparator) {
          newClassPath.append(File.pathSeparator);
        }
        newClassPath.append(inputClassPath);
      }

      // When the container launches, it takes the parent process's environment
      // and then adds/overwrites with the entries from the container launch
      // context.  Do the same thing here for correct substitution of
      // environment variables in the classpath jar manifest.
      Map<String, String> mergedEnv = new HashMap<String, String>(
              System.getenv());
      mergedEnv.putAll(environment);

      // this is hacky and temporary - it's to preserve the windows secure
      // behavior but enable non-secure windows to properly build the class
      // path for access to job.jar/lib/xyz and friends (see YARN-2803)
      Path jarDir;
      if (exec instanceof WindowsSecureContainerExecutor) {
        jarDir = nmPrivateClasspathJarDir;
      } else {
        jarDir = pwd;
      }
      String[] jarCp = FileUtil.createJarWithClassPath(
              newClassPath.toString(), jarDir, pwd, mergedEnv);
      // In a secure cluster the classpath jar must be localized to grant access
      Path localizedClassPathJar = exec.localizeClasspathJar(
              new Path(jarCp[0]), pwd, container.getUser());
      String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
      environment.put(Environment.CLASSPATH.name(), replacementClassPath);
    }
  }

  public static String getExitCodeFile(String pidFile) {
    return pidFile + EXIT_CODE_FILE_SUFFIX;
  }

  private void recordContainerLogDir(ContainerId containerId,
      String logDir) throws IOException{
    container.setLogDir(logDir);
    if (container.isRetryContextSet()) {
      context.getNMStateStore().storeContainerLogDir(containerId, logDir);
    }
  }

  private void recordContainerWorkDir(ContainerId containerId,
      String workDir) throws IOException{
    container.setWorkDir(workDir);
    if (container.isRetryContextSet()) {
      context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
    }
  }

  private void recordContainerCsiVolumesRootDir(ContainerId containerId,
      String volumesRoot) throws IOException {
    container.setCsiVolumesRootDir(volumesRoot);
    // TODO persistent to the NM store...
  }

  protected Path getContainerWorkDir() throws IOException {
    String containerWorkDir = container.getWorkDir();
    if (containerWorkDir == null
        || !dirsHandler.isGoodLocalDir(containerWorkDir)) {
      throw new IOException(
          "Could not find a good work dir " + containerWorkDir
              + " for container " + container);
    }

    return new Path(containerWorkDir);
  }

  /**
   * Clean up container's files for container relaunch or cleanup.
   */
  protected void cleanupContainerFiles(Path containerWorkDir) {
    LOG.debug("cleanup container {} files", containerWorkDir);
    // delete ContainerScriptPath
    deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
    // delete TokensPath
    deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
    // delete sysfs dir
    deleteAsUser(new Path(containerWorkDir, SYSFS_DIR));

    // delete symlinks because launch script will create symlinks again
    try {
      exec.cleanupBeforeRelaunch(container);
    } catch (IOException | InterruptedException e) {
      LOG.warn("{} exec failed to cleanup", container.getContainerId(), e);
    }
  }

  private void deleteAsUser(Path path) {
    try {
      exec.deleteAsUser(new DeletionAsUserContext.Builder()
          .setUser(container.getUser())
          .setSubDir(path)
          .build());
    } catch (Exception e) {
      LOG.warn("Failed to delete " + path, e);
    }
  }

  /**
   * Returns the PID File Path.
   */
  Path getPidFilePath() {
    return pidFilePath;
  }

  /**
   * Marks the container to be launched only if it was not launched.
   *
   * @return true if successful; false otherwise.
   */
  boolean markLaunched() {
    return containerAlreadyLaunched.compareAndSet(false, true);
  }

  /**
   * Returns if the launch is completed or not.
   */
  boolean isLaunchCompleted() {
    return completed.get();
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractContainersLauncher 源码

hadoop ContainerCleanup 源码

hadoop ContainerRelaunch 源码

hadoop ContainersLauncher 源码

hadoop ContainersLauncherEvent 源码

hadoop ContainersLauncherEventType 源码

hadoop RecoverPausedContainerLaunch 源码

hadoop RecoveredContainerLaunch 源码

hadoop SignalContainersLauncherEvent 源码

hadoop package-info 源码

0  赞