spark SparkSubmitCommandBuilder 源码

  • 2022-10-20
  • 浏览 (206)

spark SparkSubmitCommandBuilder 代码

文件路径:/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.launcher;

import java.io.File;
import java.io.IOException;
import java.util.*;

import static org.apache.spark.launcher.CommandBuilderUtils.*;

/**
 * Special command builder for handling a CLI invocation of SparkSubmit.
 * <p>
 * This builder adds command line parsing compatible with SparkSubmit. It handles setting
 * driver-side options and special parsing behavior needed for the special-casing certain internal
 * Spark applications.
 * <p>
 * This class has also some special features to aid launching shells (pyspark and sparkR) and also
 * examples.
 */
class SparkSubmitCommandBuilder extends AbstractCommandBuilder {

  /**
   * Name of the app resource used to identify the PySpark shell. The command line parser expects
   * the resource name to be the very first argument to spark-submit in this case.
   *
   * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
   * (see java_gateway.py), and can cause this code to enter into an infinite loop.
   */
  static final String PYSPARK_SHELL = "pyspark-shell-main";

  /**
   * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
   */
  static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";

  /**
   * Name of the app resource used to identify the SparkR shell. The command line parser expects
   * the resource name to be the very first argument to spark-submit in this case.
   *
   * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR shell to SparkSubmit
   * (see sparkR.R), and can cause this code to enter into an infinite loop.
   */
  static final String SPARKR_SHELL = "sparkr-shell-main";

  /**
   * This is the actual resource name that identifies the SparkR shell to SparkSubmit.
   */
  static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";

  /**
   * Name of app resource used to identify examples. When running examples, args[0] should be
   * this name. The app resource will identify the example class to run.
   */
  static final String RUN_EXAMPLE = "run-example";

  /**
   * Prefix for example class names.
   */
  static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples.";

  /**
   * This map must match the class names for available special classes, since this modifies the way
   * command line parsing works. This maps the class name to the resource to use when calling
   * spark-submit.
   */
  private static final Map<String, String> specialClasses = new HashMap<>();
  static {
    specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
    specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
      SparkLauncher.NO_RESOURCE);
    specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
      SparkLauncher.NO_RESOURCE);
  }

  final List<String> userArgs;
  private final List<String> parsedArgs;
  // Special command means no appResource and no mainClass required
  private final boolean isSpecialCommand;
  private final boolean isExample;

  /**
   * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
   * to parse the command lines for things like bin/spark-shell, which allows users to mix and
   * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
   */
  private boolean allowsMixedArguments;

  /**
   * This constructor is used when creating a user-configurable launcher. It allows the
   * spark-submit argument list to be modified after creation.
   */
  SparkSubmitCommandBuilder() {
    this.isSpecialCommand = false;
    this.isExample = false;
    this.parsedArgs = new ArrayList<>();
    this.userArgs = new ArrayList<>();
  }

  /**
   * This constructor is used when invoking spark-submit; it parses and validates arguments
   * provided by the user on the command line.
   */
  SparkSubmitCommandBuilder(List<String> args) {
    this.allowsMixedArguments = false;
    this.parsedArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;
    this.userArgs = Collections.emptyList();

    if (args.size() > 0) {
      switch (args.get(0)) {
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL:
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case RUN_EXAMPLE:
          isExample = true;
          appResource = findExamplesAppJar();
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      OptionParser parser = new OptionParser(true);
      parser.parse(submitArgs);
      this.isSpecialCommand = parser.isSpecialCommand;
    } else {
      this.isExample = isExample;
      this.isSpecialCommand = true;
    }
  }

  @Override
  public List<String> buildCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
      return buildPySparkShellCommand(env);
    } else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
      return buildSparkRCommand(env);
    } else {
      return buildSparkSubmitCommand(env);
    }
  }

  List<String> buildSparkSubmitArgs() {
    List<String> args = new ArrayList<>();
    OptionParser parser = new OptionParser(false);
    final boolean isSpecialCommand;

    // If the user args array is not empty, we need to parse it to detect exactly what
    // the user is trying to run, so that checks below are correct.
    if (!userArgs.isEmpty()) {
      parser.parse(userArgs);
      isSpecialCommand = parser.isSpecialCommand;
    } else {
      isSpecialCommand = this.isSpecialCommand;
    }

    if (!allowsMixedArguments && !isSpecialCommand) {
      checkArgument(appResource != null, "Missing application resource.");
    }

    if (verbose) {
      args.add(parser.VERBOSE);
    }

    if (master != null) {
      args.add(parser.MASTER);
      args.add(master);
    }

    if (deployMode != null) {
      args.add(parser.DEPLOY_MODE);
      args.add(deployMode);
    }

    if (appName != null) {
      args.add(parser.NAME);
      args.add(appName);
    }

    for (Map.Entry<String, String> e : conf.entrySet()) {
      args.add(parser.CONF);
      args.add(String.format("%s=%s", e.getKey(), e.getValue()));
    }

    if (propertiesFile != null) {
      args.add(parser.PROPERTIES_FILE);
      args.add(propertiesFile);
    }

    if (isExample) {
      jars.addAll(findExamplesJars());
    }

    if (!jars.isEmpty()) {
      args.add(parser.JARS);
      args.add(join(",", jars));
    }

    if (!files.isEmpty()) {
      args.add(parser.FILES);
      args.add(join(",", files));
    }

    if (!pyFiles.isEmpty()) {
      args.add(parser.PY_FILES);
      args.add(join(",", pyFiles));
    }

    if (isExample && !isSpecialCommand) {
      checkArgument(mainClass != null, "Missing example class name.");
    }

    if (mainClass != null) {
      args.add(parser.CLASS);
      args.add(mainClass);
    }

    args.addAll(parsedArgs);

    if (appResource != null) {
      args.add(appResource);
    }

    args.addAll(appArgs);

    return args;
  }

  private List<String> buildSparkSubmitCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    // Load the properties file and check whether spark-submit will be running the app's driver
    // or just launching a cluster app. When running the driver, the JVM's argument will be
    // modified to cover the driver's configuration.
    Map<String, String> config = getEffectiveConfig();
    boolean isClientMode = isClientMode(config);
    String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

    List<String> cmd = buildJavaCommand(extraClassPath);
    // Take Thrift Server as daemon
    if (isThriftServer(mainClass)) {
      addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
    }
    addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

    // We don't want the client to specify Xmx. These have to be set by their corresponding
    // memory flag --driver-memory or configuration entry spark.driver.memory
    String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS);
    checkJavaOptions(driverDefaultJavaOptions);
    String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
    checkJavaOptions(driverExtraJavaOptions);

    if (isClientMode) {
      // Figuring out where the memory value come from is a little tricky due to precedence.
      // Precedence is observed in the following order:
      // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
      // - properties file.
      // - SPARK_DRIVER_MEMORY env variable
      // - SPARK_MEM env variable
      // - default value (1g)
      // Take Thrift Server as daemon
      String tsMemory =
        isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
      String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
        System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
      cmd.add("-Xmx" + memory);
      addOptionString(cmd, driverDefaultJavaOptions);
      addOptionString(cmd, driverExtraJavaOptions);
      mergeEnvPathList(env, getLibPathEnvName(),
        config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
    }

    // SPARK-36796: Always add default `--add-opens` to submit command
    addOptionString(cmd, JavaModuleOptions.defaultModuleOptions());
    cmd.add("org.apache.spark.deploy.SparkSubmit");
    cmd.addAll(buildSparkSubmitArgs());
    return cmd;
  }

  private void checkJavaOptions(String javaOptions) {
    if (!isEmpty(javaOptions) && javaOptions.contains("Xmx")) {
      String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
        "java options (was %s). Use the corresponding --driver-memory or " +
        "spark.driver.memory configuration instead.", javaOptions);
      throw new IllegalArgumentException(msg);
    }
  }

  private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
    // For backwards compatibility, if a script is specified in
    // the pyspark command line, then run it using spark-submit.
    if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
      System.err.println(
        "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" +
        "Use ./bin/spark-submit <python file>");
      System.exit(-1);
    }

    checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");

    // When launching the pyspark shell, the spark-submit arguments should be stored in the
    // PYSPARK_SUBMIT_ARGS env variable.
    appResource = PYSPARK_SHELL_RESOURCE;
    constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");

    // Will pick up the binary executable in the following order
    // 1. conf spark.pyspark.driver.python
    // 2. conf spark.pyspark.python
    // 3. environment variable PYSPARK_DRIVER_PYTHON
    // 4. environment variable PYSPARK_PYTHON
    // 5. python
    List<String> pyargs = new ArrayList<>();
    pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
      conf.get(SparkLauncher.PYSPARK_PYTHON),
      System.getenv("PYSPARK_DRIVER_PYTHON"),
      System.getenv("PYSPARK_PYTHON"),
      "python3"));
    String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
    if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
      // pass conf spark.pyspark.python to python by environment variable.
      env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
    }
    if (!isEmpty(pyOpts)) {
      pyargs.addAll(parseOptionString(pyOpts));
    }

    return pyargs;
  }

  private List<String> buildSparkRCommand(Map<String, String> env) throws IOException {
    if (!appArgs.isEmpty() && (appArgs.get(0).endsWith(".R") || appArgs.get(0).endsWith(".r"))) {
      System.err.println(
        "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" +
        "Use ./bin/spark-submit <R file>");
      System.exit(-1);
    }
    // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
    // env variable.
    appResource = SPARKR_SHELL_RESOURCE;
    constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");

    // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
    String sparkHome = System.getenv("SPARK_HOME");
    env.put("R_PROFILE_USER",
            join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R"));

    List<String> args = new ArrayList<>();
    args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL),
      System.getenv("SPARKR_DRIVER_R"), "R"));
    return args;
  }

  private void constructEnvVarArgs(
      Map<String, String> env,
      String submitArgsEnvVariable) throws IOException {
    mergeEnvPathList(env, getLibPathEnvName(),
      getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));

    StringBuilder submitArgs = new StringBuilder();
    for (String arg : buildSparkSubmitArgs()) {
      if (submitArgs.length() > 0) {
        submitArgs.append(" ");
      }
      submitArgs.append(quoteForCommandString(arg));
    }
    env.put(submitArgsEnvVariable, submitArgs.toString());
  }

  boolean isClientMode(Map<String, String> userProps) {
    String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
    String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
    // Default master is "local[*]", so assume client mode in that case
    return userMaster == null || userDeployMode == null || "client".equals(userDeployMode);
  }

  /**
   * Return whether the given main class represents a thrift server.
   */
  private boolean isThriftServer(String mainClass) {
    return (mainClass != null &&
      mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
  }

  private String findExamplesAppJar() {
    boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
    if (isTesting) {
      return SparkLauncher.NO_RESOURCE;
    } else {
      for (String exampleJar : findExamplesJars()) {
        if (new File(exampleJar).getName().startsWith("spark-examples")) {
          return exampleJar;
        }
      }
      throw new IllegalStateException("Failed to find examples' main app jar.");
    }
  }

  private List<String> findExamplesJars() {
    boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
    List<String> examplesJars = new ArrayList<>();
    String sparkHome = getSparkHome();

    File jarsDir;
    if (new File(sparkHome, "RELEASE").isFile()) {
      jarsDir = new File(sparkHome, "examples/jars");
    } else {
      jarsDir = new File(sparkHome,
        String.format("examples/target/scala-%s/jars", getScalaVersion()));
    }

    boolean foundDir = jarsDir.isDirectory();
    checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.",
        jarsDir.getAbsolutePath());

    if (foundDir) {
      for (File f: jarsDir.listFiles()) {
        examplesJars.add(f.getAbsolutePath());
      }
    }
    return examplesJars;
  }

  private class OptionParser extends SparkSubmitOptionParser {

    boolean isSpecialCommand = false;
    private final boolean errorOnUnknownArgs;

    OptionParser(boolean errorOnUnknownArgs) {
      this.errorOnUnknownArgs = errorOnUnknownArgs;
    }

    @Override
    protected boolean handle(String opt, String value) {
      switch (opt) {
        case MASTER:
          master = value;
          break;
        case DEPLOY_MODE:
          deployMode = value;
          break;
        case PROPERTIES_FILE:
          propertiesFile = value;
          break;
        case DRIVER_MEMORY:
          conf.put(SparkLauncher.DRIVER_MEMORY, value);
          break;
        case DRIVER_JAVA_OPTIONS:
          conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
          break;
        case DRIVER_LIBRARY_PATH:
          conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
          break;
        case DRIVER_CLASS_PATH:
          conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
          break;
        case CONF:
          checkArgument(value != null, "Missing argument to %s", CONF);
          String[] setConf = value.split("=", 2);
          checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
          conf.put(setConf[0], setConf[1]);
          break;
        case CLASS:
          // The special classes require some special command line handling, since they allow
          // mixing spark-submit arguments with arguments that should be propagated to the shell
          // itself. Note that for this to work, the "--class" argument must come before any
          // non-spark-submit arguments.
          mainClass = value;
          if (specialClasses.containsKey(value)) {
            allowsMixedArguments = true;
            appResource = specialClasses.get(value);
          }
          break;
        case KILL_SUBMISSION:
        case STATUS:
          isSpecialCommand = true;
          parsedArgs.add(opt);
          parsedArgs.add(value);
          break;
        case HELP:
        case USAGE_ERROR:
        case VERSION:
          isSpecialCommand = true;
          parsedArgs.add(opt);
          break;
        default:
          parsedArgs.add(opt);
          if (value != null) {
            parsedArgs.add(value);
          }
          break;
      }
      return true;
    }

    @Override
    protected boolean handleUnknown(String opt) {
      // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
      // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
      // parameter itself will be interpreted by SparkSubmit as the application resource. The
      // remaining params will be appended to the list of SparkSubmit arguments.
      if (allowsMixedArguments) {
        appArgs.add(opt);
        return true;
      } else if (isExample) {
        String className = opt;
        if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) {
          className = EXAMPLE_CLASS_PREFIX + className;
        }
        mainClass = className;
        appResource = findExamplesAppJar();
        return false;
      } else if (errorOnUnknownArgs) {
        checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
        checkState(appResource == null, "Found unrecognized argument but resource is already set.");
        appResource = opt;
        return false;
      }
      return true;
    }

    @Override
    protected void handleExtraArgs(List<String> extra) {
      appArgs.addAll(extra);
    }

  }

}

相关信息

spark 源码目录

相关文章

spark AbstractAppHandle 源码

spark AbstractCommandBuilder 源码

spark AbstractLauncher 源码

spark ChildProcAppHandle 源码

spark CommandBuilderUtils 源码

spark FilteredObjectInputStream 源码

spark InProcessAppHandle 源码

spark InProcessLauncher 源码

spark JavaModuleOptions 源码

spark LauncherConnection 源码

0  赞