hadoop ContainersLauncher 源码

  • 2022-10-20
  • 浏览 (178)

haddop ContainersLauncher 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * The launcher for the containers. This service should be started only after
 * the {@link ResourceLocalizationService} is started as it depends on creation
 * of system directories on the local file-system.
 * 
 */
public class ContainersLauncher extends AbstractService
    implements AbstractContainersLauncher {

  private static final Logger LOG =
       LoggerFactory.getLogger(ContainersLauncher.class);

  private Context context;
  private ContainerExecutor exec;
  private Dispatcher dispatcher;
  private ContainerManagerImpl containerManager;

  private LocalDirsHandlerService dirsHandler;
  @VisibleForTesting
  public ExecutorService containerLauncher =
      HadoopExecutors.newCachedThreadPool(
        new ThreadFactoryBuilder()
          .setNameFormat("ContainersLauncher #%d")
          .build());
  @VisibleForTesting
  public final Map<ContainerId, ContainerLaunch> running =
    Collections.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());

  public ContainersLauncher() {
    super("containers-launcher");
  }

  @VisibleForTesting
  public ContainersLauncher(Context context, Dispatcher dispatcher,
      ContainerExecutor exec, LocalDirsHandlerService dirsHandler,
      ContainerManagerImpl containerManager) {
    this();
    init(context, dispatcher, exec, dirsHandler, containerManager);
  }

  @Override
  public void init(Context nmContext, Dispatcher nmDispatcher,
      ContainerExecutor containerExec, LocalDirsHandlerService nmDirsHandler,
      ContainerManagerImpl nmContainerManager) {
    this.exec = containerExec;
    this.context = nmContext;
    this.dispatcher = nmDispatcher;
    this.dirsHandler = nmDirsHandler;
    this.containerManager = nmContainerManager;
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    try {
      //TODO Is this required?
      FileContext.getLocalFSFileContext(conf);
    } catch (UnsupportedFileSystemException e) {
      throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
    }
    super.serviceInit(conf);
  }

  @Override
  protected  void serviceStop() throws Exception {
    containerLauncher.shutdownNow();
    super.serviceStop();
  }

  @Override
  public void handle(ContainersLauncherEvent event) {
    // TODO: ContainersLauncher launches containers one by one!!
    Container container = event.getContainer();
    ContainerId containerId = container.getContainerId();
    switch (event.getType()) {
      case LAUNCH_CONTAINER:
        Application app =
          context.getApplications().get(
              containerId.getApplicationAttemptId().getApplicationId());

        ContainerLaunch launch =
            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
              event.getContainer(), dirsHandler, containerManager);
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;
      case RELAUNCH_CONTAINER:
        app = context.getApplications().get(
                containerId.getApplicationAttemptId().getApplicationId());

        ContainerRelaunch relaunch =
            new ContainerRelaunch(context, getConfig(), dispatcher, exec, app,
                event.getContainer(), dirsHandler, containerManager);
        containerLauncher.submit(relaunch);
        running.put(containerId, relaunch);
        break;
      case RECOVER_CONTAINER:
        app = context.getApplications().get(
            containerId.getApplicationAttemptId().getApplicationId());
        launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
            exec, app, event.getContainer(), dirsHandler, containerManager);
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;
      case RECOVER_PAUSED_CONTAINER:
        app = context.getApplications().get(
            containerId.getApplicationAttemptId().getApplicationId());
        launch = new RecoverPausedContainerLaunch(context, getConfig(),
            dispatcher, exec, app, event.getContainer(), dirsHandler,
            containerManager);
        containerLauncher.submit(launch);
        break;
      case CLEANUP_CONTAINER:
        cleanup(event, containerId, true);
        break;
      case CLEANUP_CONTAINER_FOR_REINIT:
        cleanup(event, containerId, false);
        break;
      case SIGNAL_CONTAINER:
        SignalContainersLauncherEvent signalEvent =
            (SignalContainersLauncherEvent) event;
        ContainerLaunch runningContainer = running.get(containerId);
        if (runningContainer == null) {
          // Container not launched. So nothing needs to be done.
          LOG.info("Container " + containerId + " not running, nothing to signal.");
          return;
        }

        try {
          runningContainer.signalContainer(signalEvent.getCommand());
        } catch (IOException e) {
          LOG.warn("Got exception while signaling container " + containerId
              + " with command " + signalEvent.getCommand());
        }
        break;
      case PAUSE_CONTAINER:
        ContainerLaunch launchedContainer = running.get(containerId);
        if (launchedContainer == null) {
          // Container not launched. So nothing needs to be done.
          return;
        }

        // Pause the container
        try {
          launchedContainer.pauseContainer();
        } catch (Exception e) {
          LOG.info("Got exception while pausing container: " +
            StringUtils.stringifyException(e));
        }
        break;
      case RESUME_CONTAINER:
        ContainerLaunch launchCont = running.get(containerId);
        if (launchCont == null) {
          // Container not launched. So nothing needs to be done.
          return;
        }

        // Resume the container.
        try {
          launchCont.resumeContainer();
        } catch (Exception e) {
          LOG.info("Got exception while resuming container: " +
            StringUtils.stringifyException(e));
        }
        break;
    }
  }

  @VisibleForTesting
  void cleanup(ContainersLauncherEvent event, ContainerId containerId,
      boolean async) {
    ContainerLaunch existingLaunch = running.remove(containerId);
    if (existingLaunch == null) {
      // Container not launched.
      // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
      dispatcher.getEventHandler().handle(
          new ContainerExitEvent(containerId,
              ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
              Shell.WINDOWS ?
                  ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
                  ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
              "Container terminated before launch."));
      return;
    }

    // Cleanup a container whether it is running/killed/completed, so that
    // no sub-processes are alive.
    ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
        dispatcher, exec, event.getContainer(), existingLaunch);
    if (async) {
      containerLauncher.submit(cleanup);
    } else {
      cleanup.run();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractContainersLauncher 源码

hadoop ContainerCleanup 源码

hadoop ContainerLaunch 源码

hadoop ContainerRelaunch 源码

hadoop ContainersLauncherEvent 源码

hadoop ContainersLauncherEventType 源码

hadoop RecoverPausedContainerLaunch 源码

hadoop RecoveredContainerLaunch 源码

hadoop SignalContainersLauncherEvent 源码

hadoop package-info 源码

0  赞