hadoop LinuxContainerExecutor 源码

  • 2022-10-20
  • 浏览 (272)

haddop LinuxContainerExecutor 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPID;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPLICATION_LOCAL_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_ID_STR;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LAUNCH_PREFIX_COMMANDS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LOCAL_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_LOG_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_RUN_CMDS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_WORK_DIR;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.FILECACHE_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOCALIZED_RESOURCES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOCAL_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.LOG_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_CONTAINER_SCRIPT_PATH;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_KEYSTORE_PATH;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_TOKENS_PATH;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.NM_PRIVATE_TRUSTSTORE_PATH;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.PID;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.PID_FILE_PATH;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.TC_COMMAND_FILE;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_LOCAL_DIRS;

import org.apache.hadoop.classification.VisibleForTesting;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DefaultLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DelegatingLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.OCIContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>This class provides {@link Container} execution using a native
 * {@code container-executor} binary. By using a helper written it native code,
 * this class is able to do several things that the
 * {@link DefaultContainerExecutor} cannot, such as execution of applications
 * as the applications' owners, provide localization that takes advantage of
 * mapping the application owner to a UID on the execution host, resource
 * management through Linux CGROUPS, and Docker support.</p>
 *
 * <p>If {@code hadoop.security.authentication} is set to {@code simple},
 * then the
 * {@code yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users}
 * property will determine whether the {@code LinuxContainerExecutor} runs
 * processes as the application owner or as the default user, as set in the
 * {@code yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user}
 * property.</p>
 *
 * <p>The {@code LinuxContainerExecutor} will manage applications through an
 * appropriate {@link LinuxContainerRuntime} instance. This class uses a
 * {@link DelegatingLinuxContainerRuntime} instance, which will delegate calls
 * to either a {@link DefaultLinuxContainerRuntime} instance or a
 * {@link OCIContainerRuntime} instance, depending on the job's
 * configuration.</p>
 *
 * @see LinuxContainerRuntime
 * @see DelegatingLinuxContainerRuntime
 * @see DefaultLinuxContainerRuntime
 * @see DockerLinuxContainerRuntime
 * @see OCIContainerRuntime#isOCICompliantContainerRequested
 */
public class LinuxContainerExecutor extends ContainerExecutor {

  private static final Logger LOG =
       LoggerFactory.getLogger(LinuxContainerExecutor.class);

  private String nonsecureLocalUser;
  private Pattern nonsecureLocalUserPattern;
  private LCEResourcesHandler resourcesHandler;
  private boolean containerSchedPriorityIsSet = false;
  private int containerSchedPriorityAdjustment = 0;
  private boolean containerLimitUsers;
  private ResourceHandler resourceHandlerChain;
  private LinuxContainerRuntime linuxContainerRuntime;
  private Context nmContext;

  /**
   * The container exit code.
   */
  public enum ExitCode {
    SUCCESS(0),
    INVALID_ARGUMENT_NUMBER(1),
    INVALID_COMMAND_PROVIDED(3),
    INVALID_NM_ROOT_DIRS(5),
    SETUID_OPER_FAILED(6),
    UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
    UNABLE_TO_SIGNAL_CONTAINER(8),
    INVALID_CONTAINER_PID(9),
    OUT_OF_MEMORY(18),
    INITIALIZE_USER_FAILED(20),
    PATH_TO_DELETE_IS_NULL(21),
    INVALID_CONTAINER_EXEC_PERMISSIONS(22),
    INVALID_CONFIG_FILE(24),
    SETSID_OPER_FAILED(25),
    WRITE_PIDFILE_FAILED(26),
    WRITE_CGROUP_FAILED(27),
    TRAFFIC_CONTROL_EXECUTION_FAILED(28),
    DOCKER_RUN_FAILED(29),
    ERROR_OPENING_DOCKER_FILE(30),
    ERROR_READING_DOCKER_FILE(31),
    FEATURE_DISABLED(32),
    COULD_NOT_CREATE_SCRIPT_COPY(33),
    COULD_NOT_CREATE_CREDENTIALS_FILE(34),
    COULD_NOT_CREATE_WORK_DIRECTORIES(35),
    COULD_NOT_CREATE_APP_LOG_DIRECTORIES(36),
    COULD_NOT_CREATE_TMP_DIRECTORIES(37),
    ERROR_CREATE_CONTAINER_DIRECTORIES_ARGUMENTS(38);

    private final int code;

    ExitCode(int exitCode) {
      this.code = exitCode;
    }

    /**
     * Get the exit code as an int.
     * @return the exit code as an int
     */
    public int getExitCode() {
      return code;
    }

    @Override
    public String toString() {
      return String.valueOf(code);
    }
  }

  /**
   * Default constructor to allow for creation through reflection.
   */
  public LinuxContainerExecutor() {
  }

  /**
   * Create a LinuxContainerExecutor with a provided
   * {@link LinuxContainerRuntime}.  Used primarily for testing.
   *
   * @param linuxContainerRuntime the runtime to use
   */
  public LinuxContainerExecutor(LinuxContainerRuntime linuxContainerRuntime) {
    this.linuxContainerRuntime = linuxContainerRuntime;
  }

  @Override
  public void setConf(Configuration conf) {
    super.setConf(conf);

    resourcesHandler = getResourcesHandler(conf);

    containerSchedPriorityIsSet = false;
    if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY)
        != null) {
      containerSchedPriorityIsSet = true;
      containerSchedPriorityAdjustment = conf
          .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
              YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
    }
    nonsecureLocalUser = conf.get(
        YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
        YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
    nonsecureLocalUserPattern = Pattern.compile(
        conf.get(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY,
            YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN));
    containerLimitUsers = conf.getBoolean(
        YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS,
        YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LIMIT_USERS);
    if (!containerLimitUsers) {
      LOG.warn("{}: impersonation without authentication enabled",
          YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
    }
  }

  private LCEResourcesHandler getResourcesHandler(Configuration conf) {
    LCEResourcesHandler handler = ReflectionUtils.newInstance(
        conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
            DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);

    // Stop using CgroupsLCEResourcesHandler
    // use the resource handler chain instead
    // ResourceHandlerModule will create the cgroup cpu module if
    // CgroupsLCEResourcesHandler is set
    if (handler instanceof CgroupsLCEResourcesHandler) {
      handler =
          ReflectionUtils.newInstance(DefaultLCEResourcesHandler.class, conf);
    }
    handler.setConf(conf);
    return handler;
  }

  void verifyUsernamePattern(String user) {
    if (!UserGroupInformation.isSecurityEnabled() &&
        !nonsecureLocalUserPattern.matcher(user).matches()) {
      throw new IllegalArgumentException("Invalid user name '" + user + "'," +
          " it must match '" + nonsecureLocalUserPattern.pattern() + "'");
    }
  }

  String getRunAsUser(String user) {
    if (UserGroupInformation.isSecurityEnabled() ||
        !containerLimitUsers) {
      return user;
    } else {
      return nonsecureLocalUser;
    }
  }

  /**
   * Get the path to the {@code container-executor} binary. The path will
   * be absolute.
   *
   * @param conf the {@link Configuration}
   * @return the path to the {@code container-executor} binary
   */
  protected String getContainerExecutorExecutablePath(Configuration conf) {
    String yarnHomeEnvVar =
        System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
    File hadoopBin = new File(yarnHomeEnvVar, "bin");
    String defaultPath =
        new File(hadoopBin, "container-executor").getAbsolutePath();
    return null == conf
        ? defaultPath
        : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH,
        defaultPath);
  }

  /**
   * Add a niceness level to the process that will be executed.  Adds
   * {@code -n <nice>} to the given command. The niceness level will be
   * taken from the
   * {@link YarnConfiguration#NM_CONTAINER_EXECUTOR_SCHED_PRIORITY} property.
   *
   * @param command the command to which to add the niceness setting.
   */
  protected void addSchedPriorityCommand(List<String> command) {
    if (containerSchedPriorityIsSet) {
      command.addAll(Arrays.asList("nice", "-n",
          Integer.toString(containerSchedPriorityAdjustment)));
    }
  }

  protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
    return PrivilegedOperationExecutor.getInstance(getConf());
  }

  @Override
  public void init(Context context) throws IOException {
    Configuration conf = super.getConf();
    this.nmContext = context;

    // Send command to executor which will just start up,
    // verify configuration/permissions and exit
    try {
      PrivilegedOperation checkSetupOp = new PrivilegedOperation(
          PrivilegedOperation.OperationType.CHECK_SETUP);
      PrivilegedOperationExecutor privilegedOperationExecutor =
          getPrivilegedOperationExecutor();

      privilegedOperationExecutor.executePrivilegedOperation(checkSetupOp,
          false);
    } catch (PrivilegedOperationException e) {
      int exitCode = e.getExitCode();
      LOG.warn("Exit code from container executor initialization is : {}",
          exitCode, e);

      throw new IOException("Linux container executor not configured properly"
          + " (error=" + exitCode + ")", e);
    }

    try {
      resourceHandlerChain = ResourceHandlerModule
          .getConfiguredResourceHandlerChain(conf, nmContext);
      LOG.debug("Resource handler chain enabled = {}",
          (resourceHandlerChain != null));
      if (resourceHandlerChain != null) {
        LOG.debug("Bootstrapping resource handler chain: {}",
            resourceHandlerChain);
        resourceHandlerChain.bootstrap(conf);
      }
    } catch (ResourceHandlerException e) {
      LOG.error("Failed to bootstrap configured resource subsystems! ", e);
      throw new IOException(
          "Failed to bootstrap configured resource subsystems!");
    }

    try {
      if (linuxContainerRuntime == null) {
        LinuxContainerRuntime runtime = new DelegatingLinuxContainerRuntime();

        runtime.initialize(conf, nmContext);
        this.linuxContainerRuntime = runtime;
      }
    } catch (ContainerExecutionException e) {
      LOG.error("Failed to initialize linux container runtime(s)!", e);
      throw new IOException("Failed to initialize linux container runtime(s)!");
    }

    resourcesHandler.init(this);
  }

  @Override
  public void start() {
    super.start();
    linuxContainerRuntime.start();
  }

  @Override
  public void stop() {
    super.stop();
    linuxContainerRuntime.stop();
  }

  @Override
  public void startLocalizer(LocalizerStartContext ctx)
      throws IOException, InterruptedException {
    Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
    InetSocketAddress nmAddr = ctx.getNmAddr();
    String user = ctx.getUser();
    String appId = ctx.getAppId();
    String locId = ctx.getLocId();
    LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
    List<String> localDirs = dirsHandler.getLocalDirs();
    List<String> logDirs = dirsHandler.getLogDirs();

    verifyUsernamePattern(user);
    String runAsUser = getRunAsUser(user);
    PrivilegedOperation initializeContainerOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.INITIALIZE_CONTAINER);
    List<String> prefixCommands = new ArrayList<>();

    addSchedPriorityCommand(prefixCommands);
    initializeContainerOp.appendArgs(
        runAsUser,
        user,
        Integer.toString(
            PrivilegedOperation.RunAsUserCommand.INITIALIZE_CONTAINER
                .getValue()),
        appId,
        locId,
        nmPrivateContainerTokensPath.toUri().getPath().toString(),
        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
            localDirs),
        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
            logDirs));

    File jvm =                                  // use same jvm as parent
        new File(new File(System.getProperty("java.home"), "bin"), "java");
    initializeContainerOp.appendArgs(jvm.toString());
    initializeContainerOp.appendArgs("-classpath");
    initializeContainerOp.appendArgs(System.getProperty("java.class.path"));
    String javaLibPath = System.getProperty("java.library.path");
    if (javaLibPath != null) {
      initializeContainerOp.appendArgs("-Djava.library.path=" + javaLibPath);
    }

    initializeContainerOp.appendArgs(ContainerLocalizer.getJavaOpts(getConf()));

    List<String> localizerArgs = new ArrayList<>();

    buildMainArgs(localizerArgs, user, appId, locId, nmAddr,
        nmPrivateContainerTokensPath.getName(), localDirs);
    Path containerLogDir = getContainerLogDir(dirsHandler, appId, locId);
    localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir);

    initializeContainerOp.appendArgs(localizerArgs);

    try {
      Configuration conf = super.getConf();
      PrivilegedOperationExecutor privilegedOperationExecutor =
          getPrivilegedOperationExecutor();

      privilegedOperationExecutor.executePrivilegedOperation(prefixCommands,
          initializeContainerOp, null, null, false, true);

    } catch (PrivilegedOperationException e) {
      int exitCode = e.getExitCode();
      LOG.warn("Exit code from container {} startLocalizer is : {}",
          locId, exitCode, e);

      throw new IOException("Application " + appId + " initialization failed" +
          " (exitCode=" + exitCode + ") with output: " + e.getOutput(), e);
    }
  }

  private List<String> replaceWithContainerLogDir(List<String> commands,
      Path containerLogDir) {
    List<String> newCmds = new ArrayList<>(commands.size());

    for (String item : commands) {
      newCmds.add(item.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
          containerLogDir.toString()));
    }

    return newCmds;
  }

  private Path getContainerLogDir(LocalDirsHandlerService dirsHandler,
      String appId, String containerId) throws IOException {
    String relativeContainerLogDir = ContainerLaunch
        .getRelativeContainerLogDir(appId, containerId);

    return dirsHandler.getLogPathForWrite(relativeContainerLogDir,
        false);
  }

  /**
   * Set up the {@link ContainerLocalizer}.
   *
   * @param command the current ShellCommandExecutor command line
   * @param user localization user
   * @param appId localized app id
   * @param locId localizer id
   * @param nmAddr nodemanager address
   * @param localDirs list of local dirs
   * @see ContainerLocalizer#buildMainArgs
   */
  @VisibleForTesting
  public void buildMainArgs(List<String> command, String user, String appId,
      String locId, InetSocketAddress nmAddr, String tokenFileName,
      List<String> localDirs) {
    ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
        tokenFileName, localDirs, super.getConf());
  }

  @Override
  public void prepareContainer(ContainerPrepareContext ctx) throws IOException {

    ContainerRuntimeContext.Builder builder =
        new ContainerRuntimeContext.Builder(ctx.getContainer());

    builder.setExecutionAttribute(LOCALIZED_RESOURCES,
            ctx.getLocalizedResources())
        .setExecutionAttribute(USER, ctx.getUser())
        .setExecutionAttribute(CONTAINER_LOCAL_DIRS,
            ctx.getContainerLocalDirs())
        .setExecutionAttribute(CONTAINER_RUN_CMDS, ctx.getCommands())
        .setExecutionAttribute(CONTAINER_ID_STR,
            ctx.getContainer().getContainerId().toString());

    try {
      linuxContainerRuntime.prepareContainer(builder.build());
    } catch (ContainerExecutionException e) {
      throw new IOException("Unable to prepare container: ", e);
    }
  }

  @Override
  public int launchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    return handleLaunchForLaunchType(ctx,
        ApplicationConstants.ContainerLaunchType.LAUNCH);
  }

  @Override
  public int relaunchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    return handleLaunchForLaunchType(ctx,
        ApplicationConstants.ContainerLaunchType.RELAUNCH);
  }

  private int handleLaunchForLaunchType(ContainerStartContext ctx,
      ApplicationConstants.ContainerLaunchType type) throws IOException,
      ConfigurationException {
    Container container = ctx.getContainer();
    String user = ctx.getUser();

    verifyUsernamePattern(user);

    ContainerId containerId = container.getContainerId();

    resourcesHandler.preExecute(containerId,
            container.getResource());
    String resourcesOptions = resourcesHandler.getResourcesOption(containerId);
    String tcCommandFile = null;
    List<String> numaArgs = null;

    try {
      if (resourceHandlerChain != null) {
        List<PrivilegedOperation> ops = resourceHandlerChain
            .preStart(container);

        if (ops != null) {
          List<PrivilegedOperation> resourceOps = new ArrayList<>();

          resourceOps.add(new PrivilegedOperation(
              PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
                  resourcesOptions));

          for (PrivilegedOperation op : ops) {
            switch (op.getOperationType()) {
            case ADD_PID_TO_CGROUP:
              resourceOps.add(op);
              break;
            case TC_MODIFY_STATE:
              tcCommandFile = op.getArguments().get(0);
              break;
            case ADD_NUMA_PARAMS:
              numaArgs = op.getArguments();
              break;
            default:
              LOG.warn("PrivilegedOperation type unsupported in launch: {}",
                  op.getOperationType());
            }
          }

          if (resourceOps.size() > 1) {
            //squash resource operations
            try {
              PrivilegedOperation operation = PrivilegedOperationExecutor
                  .squashCGroupOperations(resourceOps);
              resourcesOptions = operation.getArguments().get(0);
            } catch (PrivilegedOperationException e) {
              LOG.error("Failed to squash cgroup operations!", e);
              throw new ResourceHandlerException(
                  "Failed to squash cgroup operations!");
            }
          }
        }
      }
    } catch (ResourceHandlerException e) {
      LOG.error("ResourceHandlerChain.preStart() failed!", e);
      throw new IOException("ResourceHandlerChain.preStart() failed!", e);
    }

    try {
      Path pidFilePath = getPidFilePath(containerId);
      if (pidFilePath != null) {

        ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
            ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);

        if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) {
          linuxContainerRuntime.relaunchContainer(runtimeContext);
        } else {
          linuxContainerRuntime.launchContainer(runtimeContext);
        }

      } else {
        LOG.info(
            "Container was marked as inactive. Returning terminated error");
        return ContainerExecutor.ExitCode.TERMINATED.getExitCode();
      }
    } catch (ContainerExecutionException e) {
      return handleExitCode(e, container, containerId);
    } finally {
      resourcesHandler.postExecute(containerId);
      postComplete(containerId);
    }

    return 0;
  }

  private int handleExitCode(ContainerExecutionException e, Container container,
      ContainerId containerId) throws ConfigurationException {
    int exitCode = e.getExitCode();
    LOG.warn("Exit code from container {} is : {}", containerId, exitCode);
    // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
    // terminated/killed forcefully. In all other cases, log the
    // output
    if (exitCode != ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode()
        && exitCode != ContainerExecutor.ExitCode.TERMINATED.getExitCode()) {
      LOG.warn("Exception from container-launch with container ID: {} "
          + "and exit code: {}", containerId, exitCode, e);

      StringBuilder builder = new StringBuilder();
      builder.append("Exception from container-launch.\n")
          .append("Container id: " + containerId + "\n")
          .append("Exit code: " + exitCode + "\n")
          .append("Exception message: " + e.getMessage() + "\n");
      if (!Optional.ofNullable(e.getErrorOutput()).orElse("").isEmpty()) {
        builder.append("Shell error output: " + e.getErrorOutput() + "\n");
      }
      //Skip stack trace
      String output = e.getOutput();
      if (output != null && !output.isEmpty()) {
        builder.append("Shell output: " + output + "\n");
      }
      String diagnostics = builder.toString();
      logOutput(diagnostics);
      container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
          diagnostics));
      if (exitCode ==
          ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode() ||
          exitCode ==
              ExitCode.INVALID_CONFIG_FILE.getExitCode()) {
        throw new ConfigurationException(
            "Linux Container Executor reached unrecoverable exception", e);
      }
    } else {
      container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
          "Container killed on request. Exit code is " + exitCode));
    }
    return exitCode;
  }

  private ContainerRuntimeContext buildContainerRuntimeContext(
      ContainerStartContext ctx, Path pidFilePath, String resourcesOptions,
      String tcCommandFile, List<String> numaArgs) {

    List<String> prefixCommands = new ArrayList<>();
    addSchedPriorityCommand(prefixCommands);
    addNumaArgsToCommand(prefixCommands, numaArgs);

    Container container = ctx.getContainer();

    ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
            .Builder(container);
    if (prefixCommands.size() > 0) {
      builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
              prefixCommands);
    }

    builder.setExecutionAttribute(LOCALIZED_RESOURCES,
        ctx.getLocalizedResources())
      .setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser()))
      .setExecutionAttribute(USER, ctx.getUser())
      .setExecutionAttribute(APPID, ctx.getAppId())
      .setExecutionAttribute(CONTAINER_ID_STR,
        container.getContainerId().toString())
      .setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir())
      .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
        ctx.getNmPrivateContainerScriptPath())
      .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH,
        ctx.getNmPrivateTokensPath())
      .setExecutionAttribute(NM_PRIVATE_KEYSTORE_PATH,
        ctx.getNmPrivateKeystorePath())
      .setExecutionAttribute(NM_PRIVATE_TRUSTSTORE_PATH,
        ctx.getNmPrivateTruststorePath())
      .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
      .setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs())
      .setExecutionAttribute(LOG_DIRS, ctx.getLogDirs())
      .setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs())
      .setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs())
      .setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs())
      .setExecutionAttribute(USER_FILECACHE_DIRS, ctx.getUserFilecacheDirs())
      .setExecutionAttribute(APPLICATION_LOCAL_DIRS,
          ctx.getApplicationLocalDirs())
      .setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs())
      .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);

    if (tcCommandFile != null) {
      builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
    }

    return builder.build();
  }

  @Override
  public String[] getIpAndHost(Container container)
      throws ContainerExecutionException {
    return linuxContainerRuntime.getIpAndHost(container);
  }

  private void addNumaArgsToCommand(List<String> prefixCommands,
      List<String> numaArgs) {
    if (numaArgs != null) {
      prefixCommands.addAll(numaArgs);
    }
  }

  @Override
  public int reacquireContainer(ContainerReacquisitionContext ctx)
      throws IOException, InterruptedException {
    ContainerId containerId = ctx.getContainerId();

    try {
      //Resource handler chain needs to reacquire container state
      //as well
      if (resourceHandlerChain != null) {
        try {
          resourceHandlerChain.reacquireContainer(containerId);
        } catch (ResourceHandlerException e) {
          LOG.warn("ResourceHandlerChain.reacquireContainer failed for " +
              "containerId: {} Exception: ", containerId, e);
        }
      }

      return super.reacquireContainer(ctx);
    } finally {
      resourcesHandler.postExecute(containerId);
      postComplete(containerId);
    }
  }

  @Override
  public boolean signalContainer(ContainerSignalContext ctx)
      throws IOException {
    Container container = ctx.getContainer();
    String user = ctx.getUser();
    String pid = ctx.getPid();
    Signal signal = ctx.getSignal();

    verifyUsernamePattern(user);
    String runAsUser = getRunAsUser(user);

    ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext
        .Builder(container)
        .setExecutionAttribute(RUN_AS_USER, runAsUser)
        .setExecutionAttribute(USER, user)
        .setExecutionAttribute(PID, pid)
        .setExecutionAttribute(SIGNAL, signal)
        .build();

    try {
      linuxContainerRuntime.signalContainer(runtimeContext);
    } catch (ContainerExecutionException e) {
      int retCode = e.getExitCode();
      if (retCode == PrivilegedOperation.ResultCode.INVALID_CONTAINER_PID
          .getValue()) {
        return false;
      }
      LOG.warn("Error in signalling container {} with {}; exit = {}",
          pid, signal, retCode, e);
      logOutput(e.getOutput());
      throw new IOException("Problem signalling container " + pid + " with "
          + signal + "; output: " + e.getOutput() + " and exitCode: "
          + retCode, e);
    }
    return true;
  }

  /**
   * Performs the tasks necessary to reap the container.
   *
   * @param ctx Encapsulates information necessary for reaping containers.
   * @return true if the reaping was successful.
   * @throws IOException if an error occurs while reaping the container.
   */
  @Override
  public boolean reapContainer(ContainerReapContext ctx) throws IOException {
    Container container = ctx.getContainer();
    String user = ctx.getUser();
    String runAsUser = getRunAsUser(user);
    ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext
        .Builder(container)
        .setExecutionAttribute(RUN_AS_USER, runAsUser)
        .setExecutionAttribute(USER, user)
        .build();
    try {
      linuxContainerRuntime.reapContainer(runtimeContext);
    } catch (ContainerExecutionException e) {
      int retCode = e.getExitCode();
      if (retCode != 0) {
        return false;
      }
      LOG.warn("Error in reaping container {} exit = {}",
          container.getContainerId(), retCode, e);
      logOutput(e.getOutput());
      throw new IOException("Error in reaping container "
          + container.getContainerId().toString() + " exit = " + retCode, e);
    } finally {
      postComplete(container.getContainerId());
    }
    return true;
  }

  /**
   * Performs container exec.
   *
   * @param ctx Encapsulates information necessary for exec container.
   * @return stdin and stdout of container exec.
   * @throws ContainerExecutionException if container exec fails.
   */
  @Override
  public IOStreamPair execContainer(ContainerExecContext ctx)
      throws ContainerExecutionException {
    IOStreamPair res;
    try {
      res = linuxContainerRuntime.execContainer(ctx);
    } catch (ContainerExecutionException e) {
      int retCode = e.getExitCode();
      if (retCode != 0) {
        return new IOStreamPair(null, null);
      }
      LOG.warn("Error in executing container interactive shell {} exit = {}",
          ctx, retCode, e);
      logOutput(e.getOutput());
      throw new ContainerExecutionException(
          "Error in executing container interactive shel" + ctx.getContainer()
          .getContainerId().toString() + " exit = " + retCode);
    }
    return res;
  }

  @Override
  public void deleteAsUser(DeletionAsUserContext ctx) {
    String user = ctx.getUser();
    Path dir = ctx.getSubDir();
    List<Path> baseDirs = ctx.getBasedirs();

    verifyUsernamePattern(user);

    String runAsUser = getRunAsUser(user);
    String dirString = dir == null ? "" : dir.toUri().getPath();

    PrivilegedOperation deleteAsUserOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.DELETE_AS_USER, (String) null);

    deleteAsUserOp.appendArgs(
        runAsUser,
        user,
        Integer.toString(PrivilegedOperation.
            RunAsUserCommand.DELETE_AS_USER.getValue()),
        dirString);

    List<String> pathsToDelete = new ArrayList<String>();
    if (baseDirs == null || baseDirs.size() == 0) {
      LOG.info("Deleting absolute path : {}", dir);
      pathsToDelete.add(dirString);
    } else {
      for (Path baseDir : baseDirs) {
        Path del = dir == null ? baseDir : new Path(baseDir, dir);
        LOG.info("Deleting path : {}", del);
        pathsToDelete.add(del.toString());
        deleteAsUserOp.appendArgs(baseDir.toUri().getPath());
      }
    }

    try {
      Configuration conf = super.getConf();
      PrivilegedOperationExecutor privilegedOperationExecutor =
          getPrivilegedOperationExecutor();

      privilegedOperationExecutor.executePrivilegedOperation(deleteAsUserOp,
          false);
    }   catch (PrivilegedOperationException e) {
      int exitCode = e.getExitCode();
      LOG.error("DeleteAsUser for {} returned with exit code: {}",
          StringUtils.join(" ", pathsToDelete), exitCode, e);
    }
  }

  @Override
  protected File[] readDirAsUser(String user, Path dir) {
    List<File> files = new ArrayList<>();
    PrivilegedOperation listAsUserOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.LIST_AS_USER, (String)null);
    String runAsUser = getRunAsUser(user);
    String dirString = "";

    if (dir != null) {
      dirString = dir.toUri().getPath();
    }

    listAsUserOp.appendArgs(runAsUser, user,
        Integer.toString(
            PrivilegedOperation.RunAsUserCommand.LIST_AS_USER.getValue()),
        dirString);

    try {
      PrivilegedOperationExecutor privOpExecutor =
          getPrivilegedOperationExecutor();

      String results =
          privOpExecutor.executePrivilegedOperation(listAsUserOp, true);

      for (String file: results.split("\n")) {
        // The container-executor always dumps its log output to stdout, which
        // includes 3 lines that start with "main : "
        if (!file.startsWith("main :")) {
          files.add(new File(new File(dirString), file));
        }
      }
    } catch (PrivilegedOperationException e) {
      LOG.error("ListAsUser for {} returned with exit code: {}",
          dir, e.getExitCode(), e);
    }

    return files.toArray(new File[files.size()]);
  }

  @Override
  public void symLink(String target, String symlink) {

  }

  @Override
  public boolean isContainerAlive(ContainerLivenessContext ctx)
      throws IOException {
    String user = ctx.getUser();
    String pid = ctx.getPid();
    Container container = ctx.getContainer();

    // Send a test signal to the process as the user to see if it's alive
    return signalContainer(new ContainerSignalContext.Builder()
        .setContainer(container)
        .setUser(user)
        .setPid(pid)
        .setSignal(Signal.NULL)
        .build());
  }

  /**
   * Mount a CGROUPS controller at the requested mount point and create
   * a hierarchy for the NodeManager to manage.
   *
   * @param cgroupKVs a key-value pair of the form
   * {@code controller=mount-path}
   * @param hierarchy the top directory of the hierarchy for the NodeManager
   * @throws IOException if there is a problem mounting the CGROUPS
   */
  public void mountCgroups(List<String> cgroupKVs, String hierarchy)
      throws IOException {
    try {
      PrivilegedOperation mountCGroupsOp = new PrivilegedOperation(
          PrivilegedOperation.OperationType.MOUNT_CGROUPS, hierarchy);
      Configuration conf = super.getConf();

      mountCGroupsOp.appendArgs(cgroupKVs);
      PrivilegedOperationExecutor privilegedOperationExecutor =
          getPrivilegedOperationExecutor();

      privilegedOperationExecutor.executePrivilegedOperation(mountCGroupsOp,
          false);
    } catch (PrivilegedOperationException e) {
      int exitCode = e.getExitCode();
      LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);

      throw new IOException("Problem mounting cgroups " + cgroupKVs +
          "; exit code = " + exitCode + " and output: " + e.getOutput(),
          e);
    }
  }

  @VisibleForTesting
  public ResourceHandler getResourceHandler() {
    return resourceHandlerChain;
  }

  /**
   * Remove the docker container referenced in the context.
   *
   * @param containerId the containerId for the container.
   */
  public void removeDockerContainer(String containerId) {
    try {
      PrivilegedOperationExecutor privOpExecutor =
          PrivilegedOperationExecutor.getInstance(super.getConf());
      if (DockerCommandExecutor.isRemovable(
          DockerCommandExecutor.getContainerStatus(containerId, privOpExecutor,
              nmContext))) {
        LOG.info("Removing Docker container : {}", containerId);
        DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId,
            ResourceHandlerModule.getCgroupsRelativeRoot());
        DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId,
            null, privOpExecutor, false, nmContext);
      }
    } catch (ContainerExecutionException e) {
      LOG.warn("Unable to remove docker container: {}", containerId);
    }
  }

  @VisibleForTesting
  void postComplete(final ContainerId containerId) {
    try {
      if (resourceHandlerChain != null) {
        LOG.debug("{} post complete", containerId);
        resourceHandlerChain.postComplete(containerId);
      }
    } catch (ResourceHandlerException e) {
      LOG.warn("ResourceHandlerChain.postComplete failed for " +
          "containerId: {}. Exception: ", containerId, e);
    }
  }

  @Override
  public synchronized void updateYarnSysFS(Context ctx, String user,
      String appId, String spec) throws IOException {
    LocalDirsHandlerService dirsHandler = nmContext.getLocalDirsHandler();
    Path sysFSPath = dirsHandler.getLocalPathForWrite(
        "nmPrivate/" + appId + "/sysfs/app.json");
    File file = new File(sysFSPath.toString());
    List<String> localDirs = dirsHandler.getLocalDirs();
    if (file.exists()) {
      if (!file.delete()) {
        LOG.warn("Unable to delete {}", sysFSPath);
      }
    }
    if (file.createNewFile()) {
      FileOutputStream output = new FileOutputStream(file);
      try {
        output.write(spec.getBytes("UTF-8"));
      } finally {
        output.close();
      }
    }
    PrivilegedOperation privOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.SYNC_YARN_SYSFS);
    String runAsUser = getRunAsUser(user);
    privOp.appendArgs(runAsUser,
        user,
        Integer.toString(PrivilegedOperation.RunAsUserCommand
        .SYNC_YARN_SYSFS.getValue()),
        appId, StringUtils.join(PrivilegedOperation
            .LINUX_FILE_PATH_SEPARATOR, localDirs));
    privOp.disableFailureLogging();
    PrivilegedOperationExecutor privilegedOperationExecutor =
        PrivilegedOperationExecutor.getInstance(nmContext.getConf());
    try {
      privilegedOperationExecutor.executePrivilegedOperation(null,
            privOp, null, null, false, false);
    } catch (PrivilegedOperationException e) {
      throw new IOException(e);
    }
  }

  @Override
  public String getExposedPorts(Container container)
      throws ContainerExecutionException {
    return linuxContainerRuntime.getExposedPorts(container);
  }

  @Override
  public Map<String, LocalResource> getLocalResources(Container container)
      throws IOException {
    return linuxContainerRuntime.getLocalResources(container);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CMgrCompletedAppsEvent 源码

hadoop CMgrCompletedContainersEvent 源码

hadoop CMgrSignalContainersEvent 源码

hadoop CMgrUpdateContainersEvent 源码

hadoop ContainerExecutor 源码

hadoop ContainerManagerEvent 源码

hadoop ContainerManagerEventType 源码

hadoop ContainerStateTransitionListener 源码

hadoop Context 源码

hadoop DefaultContainerExecutor 源码

0  赞