hadoop ContainerCleanup 源码

  • 2022-10-20
  • 浏览 (180)

haddop ContainerCleanup 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.EXIT_CODE_FILE_SUFFIX;

/**
 * Cleanup the container.
 * Cancels the launch if launch has not started yet or signals
 * the executor to not execute the process if not already done so.
 * Also, sends a SIGTERM followed by a SIGKILL to the process if
 * the process id is available.
 */
public class ContainerCleanup implements Runnable {

  private static final Logger LOG =
      LoggerFactory.getLogger(ContainerCleanup.class);

  private final Context context;
  private final Configuration conf;
  private final Dispatcher dispatcher;
  private final ContainerExecutor exec;
  private final Container container;
  private final ContainerLaunch launch;
  private final long sleepDelayBeforeSigKill;


  public ContainerCleanup(Context context, Configuration configuration,
      Dispatcher dispatcher, ContainerExecutor exec,
      Container container,
      ContainerLaunch containerLaunch) {

    this.context = Preconditions.checkNotNull(context, "context");
    this.conf = Preconditions.checkNotNull(configuration, "config");
    this.dispatcher = Preconditions.checkNotNull(dispatcher, "dispatcher");
    this.exec = Preconditions.checkNotNull(exec, "exec");
    this.container = Preconditions.checkNotNull(container, "container");
    this.launch = Preconditions.checkNotNull(containerLaunch, "launch");
    this.sleepDelayBeforeSigKill = conf.getLong(
        YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
        YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
  }

  @Override
  public void run() {
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    LOG.info("Cleaning up container " + containerIdStr);

    try {
      context.getNMStateStore().storeContainerKilled(containerId);
    } catch (IOException e) {
      LOG.error("Unable to mark container " + containerId
          + " killed in store", e);
    }

    // launch flag will be set to true if process already launched,
    // in process of launching, or failed to launch.
    boolean alreadyLaunched = !launch.markLaunched() ||
        launch.isLaunchCompleted();
    if (!alreadyLaunched) {
      LOG.info("Container " + containerIdStr + " not launched."
          + " No cleanup needed to be done");
      return;
    }
    LOG.debug("Marking container {} as inactive", containerIdStr);
    // this should ensure that if the container process has not launched
    // by this time, it will never be launched
    exec.deactivateContainer(containerId);
    Path pidFilePath = launch.getPidFilePath();
    LOG.debug("Getting pid for container {} to kill"
        + " from pid file {}", containerIdStr, pidFilePath != null ?
        pidFilePath : "null");
    // however the container process may have already started
    try {

      // get process id from pid file if available
      // else if shell is still active, get it from the shell
      String processId = launch.getContainerPid();

      // kill process
      String user = container.getUser();
      if (processId != null) {
        signalProcess(processId, user, containerIdStr);
      } else {
        // Normally this means that the process was notified about
        // deactivateContainer above and did not start.
        // Since we already set the state to RUNNING or REINITIALIZING
        // we have to send a killed event to continue.
        if (!launch.isLaunchCompleted()) {
          LOG.warn("Container clean up before pid file created "
              + containerIdStr);
          dispatcher.getEventHandler().handle(
              new ContainerExitEvent(container.getContainerId(),
                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
                  Shell.WINDOWS ?
                      ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
                      ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
                  "Container terminated before pid file created."));
          // There is a possibility that the launch grabbed the file name before
          // the deactivateContainer above but it was slow enough to avoid
          // getContainerPid.
          // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
          // reduces the likelihood of this race condition and process leak.
        }
      }

      // rm container in docker
      if (DockerLinuxContainerRuntime.isDockerContainerRequested(conf,
          container.getLaunchContext().getEnvironment())) {
        rmDockerContainerDelayed();
      }
    } catch (Exception e) {
      String message =
          "Exception when trying to cleanup container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.warn(message);
      dispatcher.getEventHandler().handle(
          new ContainerDiagnosticsUpdateEvent(containerId, message));
    } finally {
      // cleanup pid file if present
      if (pidFilePath != null) {
        try {
          FileContext lfs = FileContext.getLocalFSFileContext();
          lfs.delete(pidFilePath, false);
          lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
        } catch (IOException ioe) {
          LOG.warn("{} exception trying to delete pid file {}. Ignoring.",
              containerId, pidFilePath, ioe);
        }
      }
    }

    try {
      // Reap the container
      launch.reapContainer();
    } catch (IOException ioe) {
      LOG.warn("{} exception trying to reap container. Ignoring.", containerId,
          ioe);
    }
  }

  private void rmDockerContainerDelayed() {
    DeletionService deletionService = context.getDeletionService();
    DockerContainerDeletionTask deletionTask =
        new DockerContainerDeletionTask(deletionService, container.getUser(),
            container.getContainerId().toString());
    deletionService.delete(deletionTask);
  }

  private void signalProcess(String processId, String user,
      String containerIdStr) throws IOException {
    LOG.debug("Sending signal to pid {} as user {} for container {}",
        processId, user, containerIdStr);
    final ContainerExecutor.Signal signal =
        sleepDelayBeforeSigKill > 0 ? ContainerExecutor.Signal.TERM :
            ContainerExecutor.Signal.KILL;

    boolean result = sendSignal(user, processId, signal);
    LOG.debug("Sent signal {} to pid {} as user {} for container {},"
        + " result={}", signal, processId, user, containerIdStr,
        (result ? "success" : "failed"));

    if (sleepDelayBeforeSigKill > 0) {
      new ContainerExecutor.DelayedProcessKiller(container, user, processId,
          sleepDelayBeforeSigKill, ContainerExecutor.Signal.KILL, exec).start();
    }
  }

  private boolean sendSignal(String user, String processId,
      ContainerExecutor.Signal signal)
      throws IOException {
    return exec.signalContainer(
        new ContainerSignalContext.Builder().setContainer(container)
            .setUser(user).setPid(processId).setSignal(signal).build());
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractContainersLauncher 源码

hadoop ContainerLaunch 源码

hadoop ContainerRelaunch 源码

hadoop ContainersLauncher 源码

hadoop ContainersLauncherEvent 源码

hadoop ContainersLauncherEventType 源码

hadoop RecoverPausedContainerLaunch 源码

hadoop RecoveredContainerLaunch 源码

hadoop SignalContainersLauncherEvent 源码

hadoop package-info 源码

0  赞