hadoop NvidiaDockerV1CommandPlugin 源码

  • 2022-10-20
  • 浏览 (190)

haddop NvidiaDockerV1CommandPlugin 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/NvidiaDockerV1CommandPlugin.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.net.URL;
import java.net.URLConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand.VOLUME_NAME_PATTERN;

/**
 * Implementation to use nvidia-docker v1 as GPU docker command plugin.
 */
public class NvidiaDockerV1CommandPlugin implements DockerCommandPlugin {
  final static Logger LOG = LoggerFactory.
      getLogger(NvidiaDockerV1CommandPlugin.class);

  private Configuration conf;
  private Map<String, Set<String>> additionalCommands = null;
  private String volumeDriver = "local";

  // Known option
  private String DEVICE_OPTION = "--device";
  private String VOLUME_DRIVER_OPTION = "--volume-driver";
  private String MOUNT_RO_OPTION = "--volume";

  public NvidiaDockerV1CommandPlugin(Configuration conf) {
    this.conf = conf;
  }

  // Get value from key=value
  // Throw exception if '=' not found
  private String getValue(String input) throws IllegalArgumentException {
    int index = input.indexOf('=');
    if (index < 0) {
      throw new IllegalArgumentException(
          "Failed to locate '=' from input=" + input);
    }
    return input.substring(index + 1);
  }

  private void addToCommand(String key, String value) {
    if (additionalCommands == null) {
      additionalCommands = new HashMap<>();
    }
    if (!additionalCommands.containsKey(key)) {
      additionalCommands.put(key, new HashSet<>());
    }
    additionalCommands.get(key).add(value);
  }

  private void init() throws ContainerExecutionException {
    String endpoint = conf.get(
        YarnConfiguration.NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT,
        YarnConfiguration.DEFAULT_NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT);
    if (null == endpoint || endpoint.isEmpty()) {
      LOG.info(YarnConfiguration.NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT
          + " set to empty, skip init ..");
      return;
    }
    String cliOptions;
    try {
      // Talk to plugin server and get options
      URL url = new URL(endpoint);
      URLConnection uc = url.openConnection();
      uc.setRequestProperty("X-Requested-With", "Curl");

      StringWriter writer = new StringWriter();
      IOUtils.copy(uc.getInputStream(), writer, "utf-8");
      cliOptions = writer.toString();

      LOG.info("Additional docker CLI options from plugin to run GPU "
          + "containers:" + cliOptions);

      // Parse cli options
      // Examples like:
      // --device=/dev/nvidiactl --device=/dev/nvidia-uvm --device=/dev/nvidia0
      // --volume-driver=nvidia-docker
      // --volume=nvidia_driver_352.68:/usr/local/nvidia:ro

      for (String str : cliOptions.split(" ")) {
        str = str.trim();
        if (str.startsWith(DEVICE_OPTION)) {
          addToCommand(DEVICE_OPTION, getValue(str));
        } else if (str.startsWith(VOLUME_DRIVER_OPTION)) {
          volumeDriver = getValue(str);
          LOG.debug("Found volume-driver:{}", volumeDriver);
        } else if (str.startsWith(MOUNT_RO_OPTION)) {
          String mount = getValue(str);
          if (!mount.endsWith(":ro")) {
            throw new IllegalArgumentException(
                "Should not have mount other than ro, command=" + str);
          }
          addToCommand(MOUNT_RO_OPTION,
              mount.substring(0, mount.lastIndexOf(':')));
        } else{
          throw new IllegalArgumentException("Unsupported option:" + str);
        }
      }
    } catch (RuntimeException e) {
      LOG.warn(
          "RuntimeException of " + this.getClass().getSimpleName() + " init:",
          e);
      throw new ContainerExecutionException(e);
    } catch (IOException e) {
      LOG.warn("IOException of " + this.getClass().getSimpleName() + " init:",
          e);
      throw new ContainerExecutionException(e);
    }
  }

  private int getGpuIndexFromDeviceName(String device) {
    final String NVIDIA = "nvidia";
    int idx = device.lastIndexOf(NVIDIA);
    if (idx < 0) {
      return -1;
    }
    // Get last part
    String str = device.substring(idx + NVIDIA.length());
    for (int i = 0; i < str.length(); i++) {
      if (!Character.isDigit(str.charAt(i))) {
        return -1;
      }
    }
    return Integer.parseInt(str);
  }

  private Set<GpuDevice> getAssignedGpus(Container container) {
    ResourceMappings resourceMappings = container.getResourceMappings();

    // Copy of assigned Resources
    Set<GpuDevice> assignedResources = null;
    if (resourceMappings != null) {
      assignedResources = new HashSet<>();
      for (Serializable s : resourceMappings.getAssignedResources(
          ResourceInformation.GPU_URI)) {
        assignedResources.add((GpuDevice) s);
      }
    }

    if (assignedResources == null || assignedResources.isEmpty()) {
      // When no GPU resource assigned, don't need to update docker command.
      return Collections.emptySet();
    }

    return assignedResources;
  }

  @VisibleForTesting
  protected boolean requestsGpu(Container container) {
    return GpuResourceAllocator.getRequestedGpus(container.getResource()) > 0;
  }

  /**
   * Do initialize when GPU requested
   * @param container nmContainer
   * @return if #GPU-requested > 0
   * @throws ContainerExecutionException when any issue happens
   */
  private boolean initializeWhenGpuRequested(Container container)
      throws ContainerExecutionException {
    if (!requestsGpu(container)) {
      return false;
    }

    // Do lazy initialization of gpu-docker plugin
    if (additionalCommands == null) {
      init();
    }

    return true;
  }

  @Override
  public synchronized void updateDockerRunCommand(
      DockerRunCommand dockerRunCommand, Container container)
      throws ContainerExecutionException {
    if (!initializeWhenGpuRequested(container)) {
      return;
    }

    Set<GpuDevice> assignedResources = getAssignedGpus(container);
    if (assignedResources == null || assignedResources.isEmpty()) {
      return;
    }

    // Write to dockerRunCommand
    for (Map.Entry<String, Set<String>> option : additionalCommands
        .entrySet()) {
      String key = option.getKey();
      Set<String> values = option.getValue();
      if (key.equals(DEVICE_OPTION)) {
        int foundGpuDevices = 0;
        for (String deviceName : values) {
          // When specified is a GPU card (device name like /dev/nvidia[n]
          // Get index of the GPU (which is [n]).
          Integer gpuIdx = getGpuIndexFromDeviceName(deviceName);
          if (gpuIdx >= 0) {
            // Use assignedResources to filter --device given by
            // nvidia-docker-plugin.
            for (GpuDevice gpuDevice : assignedResources) {
              if (gpuDevice.getIndex() == gpuIdx) {
                foundGpuDevices++;
                dockerRunCommand.addDevice(deviceName, deviceName);
              }
            }
          } else{
            // When gpuIdx < 0, it is a controller device (such as
            // /dev/nvidiactl). In this case, add device directly.
            dockerRunCommand.addDevice(deviceName, deviceName);
          }
        }

        // Cannot get all assigned Gpu devices from docker plugin output
        if (foundGpuDevices < assignedResources.size()) {
          throw new ContainerExecutionException(
              "Cannot get all assigned Gpu devices from docker plugin output");
        }
      } else if (key.equals(MOUNT_RO_OPTION)) {
        for (String value : values) {
          int idx = value.indexOf(':');
          String source = value.substring(0, idx);
          String target = value.substring(idx + 1);
          dockerRunCommand.addReadOnlyMountLocation(source, target, true);
        }
      } else{
        throw new ContainerExecutionException("Unsupported option:" + key);
      }
    }
  }

  @Override
  public DockerVolumeCommand getCreateDockerVolumeCommand(Container container)
      throws ContainerExecutionException {
    if (!initializeWhenGpuRequested(container)) {
      return null;
    }

    String newVolumeName = null;

    // Get volume name
    Set<String> mounts = additionalCommands.get(MOUNT_RO_OPTION);
    for (String mount : mounts) {
      int idx = mount.indexOf(':');
      if (idx >= 0) {
        String mountSource = mount.substring(0, idx);
        if (VOLUME_NAME_PATTERN.matcher(mountSource).matches()) {
          // This is a valid named volume
          newVolumeName = mountSource;
          LOG.debug("Found volume name for GPU:{}", newVolumeName);
          break;
        } else{
          LOG.debug("Failed to match {} to named-volume regex pattern",
              mountSource);
        }
      }
    }

    if (newVolumeName != null) {
      DockerVolumeCommand command = new DockerVolumeCommand(
          DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND);
      command.setDriverName(volumeDriver);
      command.setVolumeName(newVolumeName);
      return command;
    }

    return null;
  }

  @Override
  public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container)
      throws ContainerExecutionException {
    // No cleanup needed.
    return null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AssignedGpuDevice 源码

hadoop GpuDevice 源码

hadoop GpuDeviceSpecificationException 源码

hadoop GpuDiscoverer 源码

hadoop GpuDockerCommandPluginFactory 源码

hadoop GpuNodeResourceUpdateHandler 源码

hadoop GpuResourcePlugin 源码

hadoop NvidiaBinaryHelper 源码

hadoop NvidiaDockerV2CommandPlugin 源码

hadoop package-info 源码

0  赞