hadoop RuncContainerRuntime 源码

  • 2022-10-20
  • 浏览 (192)

haddop RuncContainerRuntime 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/RuncContainerRuntime.java

/*
 *
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * /
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.ImageManifest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig.OCILayer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig.OCIRuntimeConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig.OCIRuntimeConfig.OCILinuxConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig.OCIRuntimeConfig.OCIMount;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncContainerExecutorConfig.OCIRuntimeConfig.OCIProcessConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncImageTagToManifestPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.runc.RuncManifestToResourcesPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi.ContainerVolumePublisher;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_RUNC_IMAGE_TAG_TO_MANIFEST_PLUGIN;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_RUNC_LAYER_MOUNTS_TO_KEEP;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REAP_RUNC_LAYER_MOUNTS_INTERVAL;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_RUNC_MANIFEST_TO_RESOURCES_PLUGIN;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_RUNC_IMAGE_TAG_TO_MANIFEST_PLUGIN;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_RUNC_LAYER_MOUNTS_TO_KEEP;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_RUNC_MANIFEST_TO_RESOURCES_PLUGIN;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REAP_RUNC_LAYER_MOUNTS_INTERVAL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*;
/**
 * <p>This class is an extension of {@link OCIContainerRuntime} that uses the
 * native {@code container-executor} binary via a
 * {@link PrivilegedOperationExecutor} instance to launch processes inside
 * Runc containers.</p>
 *
 * <p>The following environment variables are used to configure the Runc
 * engine:</p>
 *
 * <ul>
 *   <li>
 *     {@code YARN_CONTAINER_RUNTIME_TYPE} ultimately determines whether a
 *     runC container will be used. If the value is {@code runc}, a runC
 *     container will be used. Otherwise a regular process tree container will
 *     be used. This environment variable is checked by the
 *     {@link #isRuncContainerRequested} method, which is called by the
 *     {@link DelegatingLinuxContainerRuntime}.
 *   </li>
 *   <li>
 *     {@code YARN_CONTAINER_RUNTIME_RUNC_IMAGE} names which image
 *     will be used to launch the Runc container.
 *   </li>
 *   <li>
 *     {@code YARN_CONTAINER_RUNTIME_RUNC_MOUNTS} allows users to specify
 *     additional volume mounts for the runC container. The value of the
 *     environment variable should be a comma-separated list of mounts.
 *     All such mounts must be given as {@code source:dest[:mode]} and the mode
 *     must be "ro" (read-only) or "rw" (read-write) to specify the type of
 *     access being requested. If neither is specified, read-write will be
 *     assumed. The requested mounts will be validated by
 *     container-executor based on the values set in container-executor.cfg for
 *     {@code runc.allowed.ro-mounts} and {@code runc.allowed.rw-mounts}.
 *   </li>
 *   <li>
 *     {@code YARN_CONTAINER_RUNTIME_RUNC_CONTAINER_HOSTNAME} sets the
 *     hostname to be used by the Runc container. If not specified, a
 *     hostname will be derived from the container ID and set as default
 *     hostname for networks other than 'host'.
 *   </li>
 * </ul>
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RuncContainerRuntime extends OCIContainerRuntime {

  private static final Log LOG = LogFactory.getLog(
      RuncContainerRuntime.class);

  @InterfaceAudience.Private
  private static final String RUNTIME_TYPE = "RUNC";

  @InterfaceAudience.Private
  public static final String ENV_RUNC_CONTAINER_IMAGE =
      "YARN_CONTAINER_RUNTIME_RUNC_IMAGE";
  @InterfaceAudience.Private
  public static final String ENV_RUNC_CONTAINER_MOUNTS =
      "YARN_CONTAINER_RUNTIME_RUNC_MOUNTS";
  @InterfaceAudience.Private
  public static final String ENV_RUNC_CONTAINER_HOSTNAME =
      "YARN_CONTAINER_RUNTIME_RUNC_CONTAINER_HOSTNAME";

  @InterfaceAudience.Private
  public final static String ENV_RUNC_CONTAINER_PID_NAMESPACE =
      formatOciEnvKey(RUNTIME_TYPE, CONTAINER_PID_NAMESPACE_SUFFIX);
  @InterfaceAudience.Private
  public final static String ENV_RUNC_CONTAINER_RUN_PRIVILEGED_CONTAINER =
      formatOciEnvKey(RUNTIME_TYPE, RUN_PRIVILEGED_CONTAINER_SUFFIX);

  private Configuration conf;
  private Context nmContext;
  private PrivilegedOperationExecutor privilegedOperationExecutor;
  private CGroupsHandler cGroupsHandler;
  private RuncImageTagToManifestPlugin imageTagToManifestPlugin;
  private RuncManifestToResourcesPlugin manifestToResourcesPlugin;
  private ObjectMapper mapper;
  private String seccomp;
  private int layersToKeep;
  private String defaultRuncImage;
  private ScheduledExecutorService exec;
  private String seccompProfile;
  private Set<String> defaultROMounts = new HashSet<>();
  private Set<String> defaultRWMounts = new HashSet<>();
  private Set<String> allowedNetworks = new HashSet<>();
  private Set<String> allowedRuntimes = new HashSet<>();
  private AccessControlList privilegedContainersAcl;

  public RuncContainerRuntime(PrivilegedOperationExecutor
      privilegedOperationExecutor) {
    this(privilegedOperationExecutor, ResourceHandlerModule
        .getCGroupsHandler());
  }

  //A constructor with an injected cGroupsHandler primarily used for testing.
  @VisibleForTesting
  public RuncContainerRuntime(PrivilegedOperationExecutor
      privilegedOperationExecutor, CGroupsHandler cGroupsHandler) {
    super(privilegedOperationExecutor, cGroupsHandler);
    this.privilegedOperationExecutor = privilegedOperationExecutor;

    if (cGroupsHandler == null) {
      LOG.info("cGroupsHandler is null - cgroups not in use.");
    } else {
      this.cGroupsHandler = cGroupsHandler;
    }
  }

  @Override
  public void initialize(Configuration configuration, Context nmCtx)
      throws ContainerExecutionException {
    super.initialize(configuration, nmCtx);
    this.conf = configuration;
    this.nmContext = nmCtx;
    imageTagToManifestPlugin = chooseImageTagToManifestPlugin();
    imageTagToManifestPlugin.init(conf);
    manifestToResourcesPlugin = chooseManifestToResourcesPlugin();
    manifestToResourcesPlugin.init(conf);
    mapper = new ObjectMapper();
    defaultRuncImage = conf.get(YarnConfiguration.NM_RUNC_IMAGE_NAME);

    allowedNetworks.clear();
    allowedRuntimes.clear();

    allowedNetworks.addAll(Arrays.asList(
        conf.getTrimmedStrings(
        YarnConfiguration.NM_RUNC_ALLOWED_CONTAINER_NETWORKS,
        YarnConfiguration.DEFAULT_NM_RUNC_ALLOWED_CONTAINER_NETWORKS)));

    allowedRuntimes.addAll(Arrays.asList(
        conf.getTrimmedStrings(
        YarnConfiguration.NM_RUNC_ALLOWED_CONTAINER_RUNTIMES,
        YarnConfiguration.DEFAULT_NM_RUNC_ALLOWED_CONTAINER_RUNTIMES)));

    privilegedContainersAcl = new AccessControlList(conf.getTrimmed(
        YarnConfiguration.NM_RUNC_PRIVILEGED_CONTAINERS_ACL,
        YarnConfiguration.DEFAULT_NM_RUNC_PRIVILEGED_CONTAINERS_ACL));

    seccompProfile = conf.get(YarnConfiguration.NM_RUNC_SECCOMP_PROFILE);

    defaultROMounts.addAll(Arrays.asList(
        conf.getTrimmedStrings(
        YarnConfiguration.NM_RUNC_DEFAULT_RO_MOUNTS)));

    defaultRWMounts.addAll(Arrays.asList(
        conf.getTrimmedStrings(
        YarnConfiguration.NM_RUNC_DEFAULT_RW_MOUNTS)));

    try {
      //TODO Remove whitespace in seccomp that gets output to config.json
      if (seccompProfile != null) {
        seccomp = new String(Files.readAllBytes(Paths.get(seccompProfile)),
            StandardCharsets.UTF_8);
      }
    } catch (IOException ioe) {
      throw new ContainerExecutionException(ioe);
    }

    layersToKeep = conf.getInt(NM_RUNC_LAYER_MOUNTS_TO_KEEP,
        DEFAULT_NM_RUNC_LAYER_MOUNTS_TO_KEEP);

  }

  @Override
  public void start() {
    int reapRuncLayerMountsInterval =
        conf.getInt(NM_REAP_RUNC_LAYER_MOUNTS_INTERVAL,
        DEFAULT_NM_REAP_RUNC_LAYER_MOUNTS_INTERVAL);
    exec = HadoopExecutors.newScheduledThreadPool(1);
    exec.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            try {
              PrivilegedOperation launchOp = new PrivilegedOperation(
                  PrivilegedOperation.OperationType.REAP_RUNC_LAYER_MOUNTS);
              launchOp.appendArgs(Integer.toString(layersToKeep));
              try {
                String stdout = privilegedOperationExecutor
                    .executePrivilegedOperation(null,
                    launchOp, null, null, false, false);
                if(stdout != null) {
                  LOG.info("Reap layer mounts thread: " + stdout);
                }
              } catch (PrivilegedOperationException e) {
                LOG.warn("Failed to reap old runc layer mounts", e);
              }
            } catch (Exception e) {
              LOG.warn("Reap layer mount thread caught an exception: ", e);
            }
          }
        }, 0, reapRuncLayerMountsInterval, TimeUnit.SECONDS);
    imageTagToManifestPlugin.start();
    manifestToResourcesPlugin.start();
  }

  @Override
  public void stop() {
    exec.shutdownNow();
    imageTagToManifestPlugin.stop();
    manifestToResourcesPlugin.stop();
  }

  @Override
  public void launchContainer(ContainerRuntimeContext ctx)
      throws ContainerExecutionException {
    List<String> env = new ArrayList<>();
    Container container = ctx.getContainer();
    String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
    String user = ctx.getExecutionAttribute(USER);
    ContainerId containerId = container.getContainerId();
    ApplicationId appId = containerId.getApplicationAttemptId()
        .getApplicationId();

    Map<String, String> environment = container.getLaunchContext()
        .getEnvironment();
    ArrayList<OCIMount> mounts = new ArrayList<>();
    ArrayList<OCILayer> layers = new ArrayList<>();
    String hostname = environment.get(ENV_RUNC_CONTAINER_HOSTNAME);

    validateHostname(hostname);

    String containerIdStr = containerId.toString();
    String applicationId = appId.toString();
    Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);

    RuncRuntimeObject runcRuntimeObject =
        container.getContainerRuntimeData(RuncRuntimeObject.class);
    List<LocalResource> layerResources = runcRuntimeObject.getOCILayers();

    ResourceLocalizationService localizationService =
        nmContext.getContainerManager().getResourceLocalizationService();

    List<String> args = new ArrayList<>();

    try {
      try {
        LocalResource rsrc = runcRuntimeObject.getConfig();
        LocalResourceRequest req = new LocalResourceRequest(rsrc);
        LocalizedResource localRsrc = localizationService
            .getLocalizedResource(req, user, appId);
        if (localRsrc == null) {
          throw new ContainerExecutionException("Could not successfully " +
              "localize layers. rsrc: " + rsrc.getResource().getFile());
        }

        File file = new File(localRsrc.getLocalPath().toString());
        List<String> imageEnv = extractImageEnv(file);
        if (imageEnv != null && !imageEnv.isEmpty()) {
          env.addAll(imageEnv);
        }
        List<String> entrypoint = extractImageEntrypoint(file);
        if (entrypoint != null && !entrypoint.isEmpty()) {
          args.addAll(entrypoint);
        }
      } catch (IOException ioe) {
        throw new ContainerExecutionException(ioe);
      }

      for (LocalResource rsrc : layerResources) {
        LocalResourceRequest req = new LocalResourceRequest(rsrc);
        LocalizedResource localRsrc = localizationService
            .getLocalizedResource(req, user, appId);

        OCILayer layer = new OCILayer("application/vnd.squashfs",
            localRsrc.getLocalPath().toString());
        layers.add(layer);
      }
    } catch (URISyntaxException e) {
      throw new ContainerExecutionException(e);
    }

    setContainerMounts(mounts, ctx, containerWorkDir, environment);

    String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);

    Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
        NM_PRIVATE_CONTAINER_SCRIPT_PATH);

    Path nmPrivateTokensPath =
        ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH);

    int cpuShares = container.getResource().getVirtualCores();

    // Zero sets to default of 1024.  2 is the minimum value otherwise
    if (cpuShares < 2) {
      cpuShares = 2;
    }

    Path launchDst =
        new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);

    args.add("bash");
    args.add(launchDst.toUri().getPath());

    String cgroupPath = getCgroupPath(resourcesOpts, "runc-" + containerIdStr);

    String pidFile = ctx.getExecutionAttribute(PID_FILE_PATH).toString();

    @SuppressWarnings("unchecked")
    List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
    @SuppressWarnings("unchecked")
    List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);

    Path keystorePath = ctx.getExecutionAttribute(NM_PRIVATE_KEYSTORE_PATH);
    Path truststorePath = ctx.getExecutionAttribute(NM_PRIVATE_TRUSTSTORE_PATH);

    int https = 0;
    String keystore = null;
    String truststore = null;

    if (keystorePath != null && truststorePath != null) {
      https = 1;
      keystore = keystorePath.toUri().getPath();
      truststore = truststorePath.toUri().getPath();
    }

    OCIProcessConfig processConfig = createOCIProcessConfig(
        containerWorkDir.toString(), env, args);
    OCILinuxConfig linuxConfig = createOCILinuxConfig(cpuShares,
        cgroupPath, seccomp);

    OCIRuntimeConfig ociRuntimeConfig = new OCIRuntimeConfig(null, mounts,
        processConfig, hostname, null, null, linuxConfig);

    RuncContainerExecutorConfig runcContainerExecutorConfig =
        createRuncContainerExecutorConfig(runAsUser, user, containerIdStr,
        applicationId, pidFile, nmPrivateContainerScriptPath.toString(),
        nmPrivateTokensPath.toString(), https, keystore, truststore,
        localDirs, logDirs, layers,
        ociRuntimeConfig);

    String commandFile = writeCommandToFile(
        runcContainerExecutorConfig, container);
    PrivilegedOperation launchOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.RUN_RUNC_CONTAINER);

    launchOp.appendArgs(commandFile);

    try {
      privilegedOperationExecutor.executePrivilegedOperation(null,
          launchOp, null, null, false, false);
    } catch (PrivilegedOperationException e) {
      LOG.info("Launch container failed: ", e);
      try {
        LOG.debug("config.json used: " +
            mapper.writeValueAsString(runcContainerExecutorConfig));
      } catch (IOException ioe) {
        LOG.info("Json Generation Exception", ioe);
      }

      throw new ContainerExecutionException("Launch container failed", e
          .getExitCode(), e.getOutput(), e.getErrorOutput());
    }
  }

  private String getCgroupPath(String resourcesOptions, String containerIdStr) {
    if (cGroupsHandler == null) {
      LOG.debug("cGroupsHandler is null. cgroups are not in use. nothing to"
          + " do.");
      return null;
    }

    if (resourcesOptions.equals(
        (PrivilegedOperation.CGROUP_ARG_PREFIX + PrivilegedOperation
        .CGROUP_ARG_NO_TASKS))) {
      LOG.debug("no resource restrictions specified. not using runc's "
          + "cgroup options");
    } else {
      LOG.debug("using runc's cgroups options");

      String cGroupPath = "/" + cGroupsHandler.getRelativePathForCGroup(
          containerIdStr);

      LOG.debug("using cgroup parent: " + cGroupPath);

      return cGroupPath;
    }
    return null;
  }

  private void addUserMounts(List<OCIMount> mounts,
      Map<String, String> environment,
      Map<Path, List<String>> localizedResources)
      throws ContainerExecutionException {
    if (environment.containsKey(ENV_RUNC_CONTAINER_MOUNTS)) {
      Matcher parsedMounts = USER_MOUNT_PATTERN.matcher(
          environment.get(ENV_RUNC_CONTAINER_MOUNTS));
      if (!parsedMounts.find()) {
        throw new ContainerExecutionException(
            "Unable to parse user supplied mount list: "
            + environment.get(ENV_RUNC_CONTAINER_MOUNTS));
      }
      parsedMounts.reset();
      long mountCount = 0;
      while (parsedMounts.find()) {
        mountCount++;
        String src = parsedMounts.group(1);
        java.nio.file.Path srcPath = java.nio.file.Paths.get(src);
        if (!srcPath.isAbsolute()) {
          src = mountReadOnlyPath(src, localizedResources);
        }
        String dst = parsedMounts.group(2);
        String mode = parsedMounts.group(4);
        boolean isReadWrite;
        if (mode == null) {
          isReadWrite = true;
        } else if (mode.equals("rw")) {
          isReadWrite = true;
        } else if (mode.equals("ro")) {
          isReadWrite = false;
        } else {
          throw new ContainerExecutionException(
              "Unable to parse mode of some mounts in user supplied "
              + "mount list: "
              + environment.get(ENV_RUNC_CONTAINER_MOUNTS));
        }
        addRuncMountLocation(mounts, src, dst, false, isReadWrite);
      }
      long commaCount = environment.get(ENV_RUNC_CONTAINER_MOUNTS).chars()
          .filter(c -> c == ',').count();
      if (mountCount != commaCount + 1) {
        // this means the matcher skipped an improperly formatted mount
        throw new ContainerExecutionException(
            "Unable to parse some mounts in user supplied mount list: "
            + environment.get(ENV_RUNC_CONTAINER_MOUNTS));
      }
    }
  }

  private void addDefaultMountLocation(List<OCIMount> mounts,
      Set<String> defaultMounts, boolean createSource, boolean isReadWrite)
      throws ContainerExecutionException {
    if(defaultMounts != null && !defaultMounts.isEmpty()) {
      for (String mount : defaultMounts) {
        String[] dir = StringUtils.split(mount, ':');
        if (dir.length != 2) {
          throw new ContainerExecutionException("Invalid mount : " +
              mount);
        }
        String src = dir[0];
        String dst = dir[1];
        addRuncMountLocation(mounts, src, dst, createSource, isReadWrite);
      }
    }
  }

  private void addRuncMountLocation(List<OCIMount> mounts, String srcPath,
      String dstPath, boolean createSource, boolean isReadWrite) {
    if (!createSource) {
      boolean sourceExists = new File(srcPath).exists();
      if (!sourceExists) {
        return;
      }
    }

    ArrayList<String> options = new ArrayList<>();
    if (isReadWrite) {
      options.add("rw");
    } else {
      options.add("ro");
    }
    options.add("rbind");
    options.add("rprivate");
    mounts.add(new OCIMount(dstPath, "bind", srcPath, options));
  }

  private void addAllRuncMountLocations(List<OCIMount> mounts,
      List<String> paths, boolean createSource, boolean isReadWrite) {
    for (String dir: paths) {
      this.addRuncMountLocation(mounts, dir, dir, createSource, isReadWrite);
    }
  }

  public Map<String, LocalResource> getLocalResources(
      Container container) throws IOException {
    Map<String, LocalResource> containerLocalRsrc =
        container.getLaunchContext().getLocalResources();
    long layerCount = 0;
    Map<String, String> environment =
        container.getLaunchContext().getEnvironment();
    String imageName = environment.get(ENV_RUNC_CONTAINER_IMAGE);
    if (imageName == null || imageName.isEmpty()) {
      environment.put(ENV_RUNC_CONTAINER_IMAGE,
          defaultRuncImage);
      imageName = defaultRuncImage;
    }

    ImageManifest manifest =
        imageTagToManifestPlugin.getManifestFromImageTag(imageName);
    LocalResource config =
        manifestToResourcesPlugin.getConfigResource(manifest);
    List<LocalResource> layers =
        manifestToResourcesPlugin.getLayerResources(manifest);

    RuncRuntimeObject runcRuntimeObject =
        new RuncRuntimeObject(config, layers);
    container.setContainerRuntimeData(runcRuntimeObject);

    for (LocalResource localRsrc : layers) {
      while(containerLocalRsrc.putIfAbsent("runc-layer" +
          Long.toString(layerCount), localRsrc) != null) {
        layerCount++;
      }
    }

    while(containerLocalRsrc.putIfAbsent("runc-config" +
      Long.toString(layerCount), config) != null) {
      layerCount++;
    }

    return containerLocalRsrc;
  }

  protected RuncImageTagToManifestPlugin chooseImageTagToManifestPlugin()
      throws ContainerExecutionException {
    String pluginName =
        conf.get(NM_RUNC_IMAGE_TAG_TO_MANIFEST_PLUGIN,
        DEFAULT_NM_RUNC_IMAGE_TAG_TO_MANIFEST_PLUGIN);
    RuncImageTagToManifestPlugin runcImageTagToManifestPlugin;
    try {
      Class<?> clazz = Class.forName(pluginName);
      runcImageTagToManifestPlugin =
          (RuncImageTagToManifestPlugin) clazz.newInstance();
    } catch (Exception e) {
      throw new ContainerExecutionException(e);
    }
    return runcImageTagToManifestPlugin;
  }

  protected RuncManifestToResourcesPlugin chooseManifestToResourcesPlugin()
      throws ContainerExecutionException {
    String pluginName =
        conf.get(NM_RUNC_MANIFEST_TO_RESOURCES_PLUGIN,
        DEFAULT_NM_RUNC_MANIFEST_TO_RESOURCES_PLUGIN);
    LOG.info("pluginName = " + pluginName);
    RuncManifestToResourcesPlugin runcManifestToResourcesPlugin;
    try {
      Class<?> clazz = Class.forName(pluginName);
      runcManifestToResourcesPlugin =
          (RuncManifestToResourcesPlugin) clazz.newInstance();
    } catch (Exception e) {
      throw new ContainerExecutionException(e);
    }
    return runcManifestToResourcesPlugin;
  }

  @SuppressWarnings("unchecked")
  protected List<String> extractImageEnv(File config) throws IOException {
    JsonNode node = mapper.readTree(config);
    JsonNode envNode = node.path("config").path("Env");
    if (envNode.isMissingNode()) {
      return null;
    }
    return mapper.readValue(envNode.traverse(), List.class);
  }

  @SuppressWarnings("unchecked")
  protected List<String> extractImageEntrypoint(File config)
      throws IOException {
    JsonNode node = mapper.readTree(config);
    JsonNode entrypointNode = node.path("config").path("Entrypoint");
    if (entrypointNode.isMissingNode()) {
      return null;
    }
    return mapper.readValue(entrypointNode.traverse(), List.class);
  }

  private RuncContainerExecutorConfig createRuncContainerExecutorConfig(
      String runAsUser, String username, String containerId,
      String applicationId, String pidFile,
      String containerScriptPath, String containerCredentialsPath,
      int https, String keystorePath, String truststorePath,
      List<String> localDirs, List<String> logDirs,
      List<OCILayer> layers, OCIRuntimeConfig ociRuntimeConfig) {

    return new RuncContainerExecutorConfig(runAsUser, username, containerId,
        applicationId, pidFile, containerScriptPath, containerCredentialsPath,
        https, keystorePath, truststorePath,
        localDirs, logDirs, layers, layersToKeep, ociRuntimeConfig);
  }

  private OCIProcessConfig createOCIProcessConfig(String cwd,
      List<String> env, List<String> args) {
    return new OCIProcessConfig(false, null, cwd, env,
        args, null, null, null, false, 0, null, null);
  }

  private OCILinuxConfig createOCILinuxConfig(long cpuShares,
      String cgroupsPath, String seccompProf) {
    OCILinuxConfig.Resources.CPU cgroupCPU =
        new OCILinuxConfig.Resources.CPU(cpuShares, 0, 0, 0, 0,
        null, null);
    OCILinuxConfig.Resources cgroupResources =
        new OCILinuxConfig.Resources(null, null, cgroupCPU, null, null, null,
        null, null);

    return new OCILinuxConfig(null, null, null, null,
        cgroupsPath, cgroupResources, null, null, seccompProf, null, null,
        null, null);
  }

  private void setContainerMounts(ArrayList<OCIMount> mounts,
      ContainerRuntimeContext ctx, Path containerWorkDir,
      Map<String, String> environment)
      throws ContainerExecutionException {
    @SuppressWarnings("unchecked")
    List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
    @SuppressWarnings("unchecked")
    List<String> containerLogDirs = ctx.getExecutionAttribute(
        CONTAINER_LOG_DIRS);
    @SuppressWarnings("unchecked")
    List<String> userFilecacheDirs =
        ctx.getExecutionAttribute(USER_FILECACHE_DIRS);
    @SuppressWarnings("unchecked")
    List<String> applicationLocalDirs =
        ctx.getExecutionAttribute(APPLICATION_LOCAL_DIRS);
    @SuppressWarnings("unchecked")
    Map<Path, List<String>> localizedResources = ctx.getExecutionAttribute(
        LOCALIZED_RESOURCES);

    addRuncMountLocation(mounts, containerWorkDir.toString() +
        "/private_slash_tmp", "/tmp", true, true);
    addRuncMountLocation(mounts, containerWorkDir.toString() +
        "/private_var_slash_tmp", "/var/tmp", true, true);

    addAllRuncMountLocations(mounts, containerLogDirs, true, true);
    addAllRuncMountLocations(mounts, applicationLocalDirs, true, true);
    addAllRuncMountLocations(mounts, filecacheDirs, false, false);
    addAllRuncMountLocations(mounts, userFilecacheDirs, false, false);
    addDefaultMountLocation(mounts, defaultROMounts, false, false);
    addDefaultMountLocation(mounts, defaultRWMounts, false, true);
    addUserMounts(mounts, environment, localizedResources);
  }

  public String writeCommandToFile(
      RuncContainerExecutorConfig runcContainerExecutorConfig,
      Container container)
      throws ContainerExecutionException {
    ContainerId containerId = container.getContainerId();
    String filePrefix = containerId.toString();
    ApplicationId appId = containerId.getApplicationAttemptId()
        .getApplicationId();
    File commandFile;
    try {
      File cmdDir = null;

      if(nmContext != null && nmContext.getLocalDirsHandler() != null) {
        String cmdDirStr = nmContext.getLocalDirsHandler().getLocalPathForWrite(
            ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR +
            appId + Path.SEPARATOR + filePrefix + Path.SEPARATOR).toString();
        cmdDir = new File(cmdDirStr);
        if (!cmdDir.mkdirs() && !cmdDir.exists()) {
          throw new IOException("Cannot create container private directory "
              + cmdDir);
        }
      }
      commandFile = new File(cmdDir + "/runc-config.json");

      try {
        mapper.writeValue(commandFile, runcContainerExecutorConfig);
      } catch (IOException ioe) {
        throw new ContainerExecutionException(ioe);
      }

      return commandFile.getAbsolutePath();
    } catch (IOException e) {
      LOG.warn("Unable to write runc config.json to temporary file!");
      throw new ContainerExecutionException(e);
    }
  }

  public String getExposedPorts(Container container) {
    return null;
  }

  public String[] getIpAndHost(Container container) {
    return null;
  }

  public IOStreamPair execContainer(ContainerExecContext ctx)
      throws ContainerExecutionException {
    return null;
  }

  public void reapContainer(ContainerRuntimeContext ctx)
      throws ContainerExecutionException {
  }

  public void relaunchContainer(ContainerRuntimeContext ctx)
      throws ContainerExecutionException {
  }

  /**
   * Return whether the given environment variables indicate that the operation
   * is requesting a Runc container.  If the environment contains a key
   * called {@code YARN_CONTAINER_RUNTIME_TYPE} whose value is {@code runc},
   * this method will return true.  Otherwise it will return false.
   *
   * @param daemonConf the NodeManager daemon configuration
   * @param env the environment variable settings for the operation
   * @return whether a Runc container is requested
   */
  public static boolean isRuncContainerRequested(Configuration daemonConf,
      Map<String, String> env) {
    String type = (env == null)
        ? null : env.get(ContainerRuntimeConstants.ENV_CONTAINER_TYPE);
    if (type == null) {
      type = daemonConf.get(YarnConfiguration.LINUX_CONTAINER_RUNTIME_TYPE);
    }
    return type != null && type.equals(
        ContainerRuntimeConstants.CONTAINER_RUNTIME_RUNC);
  }


  @Override
  public boolean isRuntimeRequested(Map<String, String> env) {
    return isRuncContainerRequested(conf, env);
  }

  @Override
  public void signalContainer(ContainerRuntimeContext ctx)
      throws ContainerExecutionException {
    ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
    Container container = ctx.getContainer();

    if (signal == ContainerExecutor.Signal.KILL ||
        signal == ContainerExecutor.Signal.TERM) {
      ContainerVolumePublisher publisher = new ContainerVolumePublisher(
          container, container.getCsiVolumesRootDir(), this);
      try {
        publisher.unpublishVolumes();
      } catch (YarnException | IOException e) {
        throw new ContainerExecutionException(e);
      }
    }

    PrivilegedOperation signalOp = new PrivilegedOperation(
        PrivilegedOperation.OperationType.SIGNAL_CONTAINER);

    signalOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
        ctx.getExecutionAttribute(USER),
        Integer.toString(PrivilegedOperation.RunAsUserCommand
        .SIGNAL_CONTAINER.getValue()),
        ctx.getExecutionAttribute(PID),
        Integer.toString(signal.getValue()));

    //Some failures here are acceptable. Let the calling executor decide.
    signalOp.disableFailureLogging();

    try {
      PrivilegedOperationExecutor executor = PrivilegedOperationExecutor
          .getInstance(conf);

      executor.executePrivilegedOperation(null,
          signalOp, null, null, false, false);
    } catch (PrivilegedOperationException e) {
      //Don't log the failure here. Some kinds of signaling failures are
      // acceptable. Let the calling executor decide what to do.
      throw new ContainerExecutionException("Signal container failed", e
          .getExitCode(), e.getOutput(), e.getErrorOutput());
    }
  }

  @InterfaceStability.Unstable
  static class RuncRuntimeObject {
    private final List<LocalResource> layers;
    private final LocalResource config;

    RuncRuntimeObject(LocalResource config,
        List<LocalResource> layers) {
      this.config = config;
      this.layers = layers;
    }

    public LocalResource getConfig() {
      return this.config;
    }

    public List<LocalResource> getOCILayers() {
      return this.layers;
    }
  }

  boolean getHostPidNamespaceEnabled() {
    return conf.getBoolean(
        YarnConfiguration.NM_RUNC_ALLOW_HOST_PID_NAMESPACE,
        YarnConfiguration.DEFAULT_NM_RUNC_ALLOW_HOST_PID_NAMESPACE);
  }

  boolean getPrivilegedContainersEnabledOnCluster() {
    return conf.getBoolean(
        YarnConfiguration.NM_RUNC_ALLOW_PRIVILEGED_CONTAINERS,
        YarnConfiguration.DEFAULT_NM_RUNC_ALLOW_PRIVILEGED_CONTAINERS);
  }

  Set<String> getAllowedNetworks() {
    return allowedNetworks;
  }

  Set<String> getAllowedRuntimes() {
    return allowedRuntimes;
  }

  AccessControlList getPrivilegedContainersAcl() {
    return privilegedContainersAcl;
  }

  String getEnvOciContainerPidNamespace() {
    return ENV_RUNC_CONTAINER_PID_NAMESPACE;
  }

  String getEnvOciContainerRunPrivilegedContainer() {
    return ENV_RUNC_CONTAINER_RUN_PRIVILEGED_CONTAINER;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DefaultLinuxContainerRuntime 源码

hadoop DelegatingLinuxContainerRuntime 源码

hadoop DockerLinuxContainerRuntime 源码

hadoop JavaSandboxLinuxContainerRuntime 源码

hadoop LinuxContainerRuntime 源码

hadoop LinuxContainerRuntimeConstants 源码

hadoop OCIContainerRuntime 源码

0  赞