hadoop ApplicationMaster 源码

  • 2022-10-20
  • 浏览 (183)

haddop ApplicationMaster 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.applications.distributedshell;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.UncheckedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.Base64;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;

import org.apache.hadoop.classification.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An ApplicationMaster for executing shell commands on a set of launched
 * containers using the YARN framework.
 * 
 * <p>
 * This class is meant to act as an example on how to write yarn-based
 * application masters.
 * </p>
 * 
 * <p>
 * The ApplicationMaster is started on a container by the
 * <code>ResourceManager</code>'s launcher. The first thing that the
 * <code>ApplicationMaster</code> needs to do is to connect and register itself
 * with the <code>ResourceManager</code>. The registration sets up information
 * within the <code>ResourceManager</code> regarding what host:port the
 * ApplicationMaster is listening on to provide any form of functionality to a
 * client as well as a tracking url that a client can use to keep track of
 * status/job history if needed. However, in the distributedshell, trackingurl
 * and appMasterHost:appMasterRpcPort are not supported.
 * </p>
 * 
 * <p>
 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
 * <code>ResourceManager</code> at regular intervals to inform the
 * <code>ResourceManager</code> that it is up and alive. The
 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
 * <code>ApplicationMaster</code> acts as a heartbeat.
 * 
 * <p>
 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
 * required no. of containers using {@link ResourceRequest} with the necessary
 * resource specifications such as node location, computational
 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
 * responds with an {@link AllocateResponse} that informs the
 * <code>ApplicationMaster</code> of the set of newly allocated containers,
 * completed containers as well as current state of available resources.
 * </p>
 * 
 * <p>
 * For each allocated container, the <code>ApplicationMaster</code> can then set
 * up the necessary launch context via {@link ContainerLaunchContext} to specify
 * the allocated container id, local resources required by the executable, the
 * environment to be setup for the executable, commands to execute, etc. and
 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
 * launch and execute the defined commands on the given allocated container.
 * </p>
 * 
 * <p>
 * The <code>ApplicationMaster</code> can monitor the launched container by
 * either querying the <code>ResourceManager</code> using
 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
 * container's {@link ContainerId}.
 *
 * <p>
 * After the job has been completed, the <code>ApplicationMaster</code> has to
 * send a {@link FinishApplicationMasterRequest} to the
 * <code>ResourceManager</code> to inform it that the
 * <code>ApplicationMaster</code> has been completed.
 */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ApplicationMaster {

  private static final Logger LOG = LoggerFactory
      .getLogger(ApplicationMaster.class);

  @VisibleForTesting
  @Private
  public enum DSEvent {
    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
  }
  
  @VisibleForTesting
  @Private
  public enum DSEntity {
    DS_APP_ATTEMPT, DS_CONTAINER
  }

  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";

  // Configuration
  private Configuration conf;

  // Handle to communicate with the Resource Manager
  @SuppressWarnings("rawtypes")
  private AMRMClientAsync amRMClient;

  // In both secure and non-secure modes, this points to the job-submitter.
  @VisibleForTesting
  UserGroupInformation appSubmitterUgi;

  private Path homeDirectory;

  // Handle to communicate with the Node Manager
  private NMClientAsync nmClientAsync;
  // Listen to process the response from the Node Manager
  private NMCallbackHandler containerListener;

  // Application Attempt Id ( combination of attemptId and fail count )
  @VisibleForTesting
  protected ApplicationAttemptId appAttemptID;

  private ApplicationId appId;
  private String appName;

  // TODO
  // For status update for clients - yet to be implemented
  // Hostname of the container
  private String appMasterHostname = "";
  // Port on which the app master listens for status updates from clients
  private int appMasterRpcPort = -1;
  // Tracking url to which app master publishes info for clients to monitor
  private String appMasterTrackingUrl = "";

  private boolean timelineServiceV2Enabled = false;

  private boolean timelineServiceV1Enabled = false;

  // App Master configuration
  // No. of containers to run shell command on
  @VisibleForTesting
  protected int numTotalContainers = 1;
  // Memory to request for the container on which the shell command will run
  private static final long DEFAULT_CONTAINER_MEMORY = 10;
  private long containerMemory = DEFAULT_CONTAINER_MEMORY;
  // VirtualCores to request for the container on which the shell command will run
  private static final int DEFAULT_CONTAINER_VCORES = 1;
  private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
  // All other resources to request for the container
  // on which the shell command will run
  private Map<String, Long> containerResources = new HashMap<>();
  // Priority of the request
  private int requestPriority;
  // Execution type of the containers.
  // Default GUARANTEED.
  private ExecutionType containerType = ExecutionType.GUARANTEED;
  // Whether to automatically promote opportunistic containers.
  private boolean autoPromoteContainers = false;
  // Whether to enforce execution type of the containers.
  private boolean enforceExecType = false;

  // Resource profile for the container
  private String containerResourceProfile = "";
  Map<String, Resource> resourceProfiles;

  private boolean keepContainersAcrossAttempts = false;

  // Counter for completed containers ( complete denotes successful or failed )
  private AtomicInteger numCompletedContainers = new AtomicInteger();
  // Allocated container count so that we know how many containers has the RM
  // allocated to us
  @VisibleForTesting
  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
  // Count of failed containers
  private AtomicInteger numFailedContainers = new AtomicInteger();
  // Count of containers already requested from the RM
  // Needed as once requested, we should not request for containers again.
  // Only request for more if the original requirement changes.
  @VisibleForTesting
  protected AtomicInteger numRequestedContainers = new AtomicInteger();

  protected AtomicInteger numIgnore = new AtomicInteger();

  protected AtomicInteger totalRetries = new AtomicInteger(10);

  // Shell command to be executed
  private String shellCommand = "";
  // Args to be passed to the shell command
  private String shellArgs = "";
  // Env variables to be setup for the shell command
  private Map<String, String> shellEnv = new HashMap<String, String>();

  // Location of shell script ( obtained from info set in env )
  // Shell script path in fs
  private String scriptPath = "";
  // Timestamp needed for creating a local resource
  private long shellScriptPathTimestamp = 0;
  // File length needed for local resource
  private long shellScriptPathLen = 0;

  // Placement Specifications
  private Map<String, PlacementSpec> placementSpecs = null;

  // Container retry options
  private ContainerRetryPolicy containerRetryPolicy =
      ContainerRetryPolicy.NEVER_RETRY;
  private Set<Integer> containerRetryErrorCodes = null;
  private int containerMaxRetries = 0;
  private int containrRetryInterval = 0;
  private long containerFailuresValidityInterval = -1;

  private List<String> localizableFiles = new ArrayList<>();

  // Timeline domain ID
  private String domainId = null;

  // Hardcoded path to shell script in launch container's local env
  private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH
      + ".sh";
  private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH
      + ".bat";

  // Hardcoded path to custom log_properties
  private static final String log4jPath = "log4j.properties";

  private static final String shellCommandPath = "shellCommands";
  private static final String shellArgsPath = "shellArgs";

  private volatile boolean done;

  private ByteBuffer allTokens;

  // Launch threads
  private List<Thread> launchThreads = new ArrayList<Thread>();

  // Timeline Client
  @VisibleForTesting
  TimelineClient timelineClient;

  // Timeline v2 Client
  @VisibleForTesting
  TimelineV2Client timelineV2Client;

  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
  static final String APPID_TIMELINE_FILTER_NAME = "appId";
  static final String USER_TIMELINE_FILTER_NAME = "user";
  static final String DIAGNOSTICS = "Diagnostics";

  private final String linux_bash_command = "bash";
  private final String windows_command = "cmd /c";

  private int yarnShellIdCounter = 1;
  private final AtomicLong allocIdCounter = new AtomicLong(1);

  @VisibleForTesting
  protected final Set<ContainerId> launchedContainers =
      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());

  private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);

  /**
   * Container start times used to set id prefix while publishing entity
   * to ATSv2.
   */
  private final ConcurrentMap<ContainerId, Long> containerStartTimes =
      new ConcurrentHashMap<ContainerId, Long>();

  private ConcurrentMap<ContainerId, Long> getContainerStartTimes() {
    return containerStartTimes;
  }

  /**
   * @param args Command line args
   */
  public static void main(String[] args) {
    boolean result = false;
    ApplicationMaster appMaster = null;
    try {
      appMaster = new ApplicationMaster();
      LOG.info("Initializing ApplicationMaster");
      boolean doRun = appMaster.init(args);
      if (!doRun) {
        System.exit(0);
      }
      appMaster.run();
      result = appMaster.finish();
    } catch (Throwable t) {
      LOG.error("Error running ApplicationMaster", t);
      LogManager.shutdown();
      ExitUtil.terminate(1, t);
    } finally {
      if (appMaster != null) {
        appMaster.cleanup();
      }
    }
    if (result) {
      LOG.info("Application Master completed successfully. exiting");
      System.exit(0);
    } else {
      LOG.error("Application Master failed. exiting");
      System.exit(2);
    }
  }

  /**
   * Dump out contents of $CWD and the environment to stdout for debugging
   */
  private void dumpOutDebugInfo() {

    LOG.info("Dump debug output");
    Map<String, String> envs = System.getenv();
    for (Map.Entry<String, String> env : envs.entrySet()) {
      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
      System.out.println("System env: key=" + env.getKey() + ", val="
          + env.getValue());
    }

    BufferedReader buf = null;
    try {
      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
        Shell.execCommand("ls", "-al");
      buf = new BufferedReader(new StringReader(lines));
      String line = "";
      while ((line = buf.readLine()) != null) {
        LOG.info("System CWD content: " + line);
        System.out.println("System CWD content: " + line);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      IOUtils.cleanupWithLogger(LOG, buf);
    }
  }

  public ApplicationMaster() {
    // Set up the configuration
    conf = new YarnConfiguration();
  }

  /**
   * Parse command line options
   *
   * @param args Command line args
   * @return Whether init successful and run should be invoked
   * @throws ParseException
   * @throws IOException
   */
  public boolean init(String[] args) throws ParseException, IOException {
    Options opts = new Options();
    opts.addOption("appname", true,
        "Application Name. Default value - DistributedShell");
    opts.addOption("app_attempt_id", true,
        "App Attempt ID. Not to be used unless for testing purposes");
    opts.addOption("shell_env", true,
        "Environment for shell script. Specified as env_key=env_val pairs");
    opts.addOption("container_type", true,
        "Container execution type, GUARANTEED or OPPORTUNISTIC");
    opts.addOption("promote_opportunistic_after_start", false,
        "Flag to indicate whether to automatically promote opportunistic"
            + " containers to guaranteed.");
    opts.addOption("enforce_execution_type", false,
        "Flag to indicate whether to enforce execution type of containers");
    opts.addOption("container_memory", true,
        "Amount of memory in MB to be requested to run the shell command");
    opts.addOption("container_vcores", true,
        "Amount of virtual cores to be requested to run the shell command");
    opts.addOption("container_resources", true,
        "Amount of resources to be requested to run the shell command. " +
        "Specified as resource type=value pairs separated by commas. " +
        "E.g. -container_resources memory-mb=512,vcores=1");
    opts.addOption("container_resource_profile", true,
        "Resource profile to be requested to run the shell command");
    opts.addOption("num_containers", true,
        "No. of containers on which the shell command needs to be executed");
    opts.addOption("priority", true, "Application Priority. Default 0");
    opts.addOption("container_retry_policy", true,
        "Retry policy when container fails to run, "
            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
    opts.addOption("container_retry_error_codes", true,
        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
            + "codes is specified with this option, "
            + "e.g. --container_retry_error_codes 1,2,3");
    opts.addOption("container_max_retries", true,
        "If container could retry, it specifies max retires");
    opts.addOption("container_retry_interval", true,
        "Interval between each retry, unit is milliseconds");
    opts.addOption("container_failures_validity_interval", true,
        "Failures which are out of the time window will not be added to"
            + " the number of container retry attempts");
    opts.addOption("placement_spec", true, "Placement specification");
    opts.addOption("debug", false, "Dump out debug information");
    opts.addOption("keep_containers_across_application_attempts", false,
        "Flag to indicate whether to keep containers across application "
            + "attempts."
            + " If the flag is true, running containers will not be killed when"
            + " application attempt fails and these containers will be "
            + "retrieved by"
            + " the new application attempt ");
    opts.addOption("localized_files", true, "List of localized files");
    opts.addOption("homedir", true, "Home Directory of Job Owner");

    opts.addOption("help", false, "Print usage");
    CommandLine cliParser = new GnuParser().parse(opts, args);

    if (args.length == 0) {
      printUsage(opts);
      throw new IllegalArgumentException(
          "No args specified for application master to initialize");
    }

    //Check whether customer log4j.properties file exists
    if (fileExist(log4jPath)) {
      try {
        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
            log4jPath);
      } catch (Exception e) {
        LOG.warn("Can not set up custom log4j properties. " + e);
      }
    }

    appName = cliParser.getOptionValue("appname", "DistributedShell");

    if (cliParser.hasOption("help")) {
      printUsage(opts);
      return false;
    }

    if (cliParser.hasOption("debug")) {
      dumpOutDebugInfo();
    }

    homeDirectory = cliParser.hasOption("homedir") ?
        new Path(cliParser.getOptionValue("homedir")) :
        new Path("/user/" + System.getenv(ApplicationConstants.
        Environment.USER.name()));

    if (cliParser.hasOption("placement_spec")) {
      String placementSpec = cliParser.getOptionValue("placement_spec");
      String decodedSpec = getDecodedPlacementSpec(placementSpec);
      LOG.info("Placement Spec received [{}]", decodedSpec);

      this.numTotalContainers = 0;
      int globalNumOfContainers = Integer
          .parseInt(cliParser.getOptionValue("num_containers", "0"));
      parsePlacementSpecs(decodedSpec, globalNumOfContainers);
      LOG.info("Total num containers requested [{}]", numTotalContainers);

      if (numTotalContainers == 0) {
        throw new IllegalArgumentException(
            "Cannot run distributed shell with no containers");
      }
    }

    Map<String, String> envs = System.getenv();

    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
      if (cliParser.hasOption("app_attempt_id")) {
        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
      } else {
        throw new IllegalArgumentException(
            "Application Attempt Id not set in the environment");
      }
    } else {
      ContainerId containerId = ContainerId.fromString(envs
          .get(Environment.CONTAINER_ID.name()));
      appAttemptID = containerId.getApplicationAttemptId();
      appId = appAttemptID.getApplicationId();
    }

    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_HOST.name())) {
      throw new RuntimeException(Environment.NM_HOST.name()
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
      throw new RuntimeException(Environment.NM_HTTP_PORT
          + " not set in the environment");
    }
    if (!envs.containsKey(Environment.NM_PORT.name())) {
      throw new RuntimeException(Environment.NM_PORT.name()
          + " not set in the environment");
    }

    LOG.info("Application master for app" + ", appId="
        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
        + appAttemptID.getApplicationId().getClusterTimestamp()
        + ", attemptId=" + appAttemptID.getAttemptId());

    if (!fileExist(shellCommandPath)
        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
      throw new IllegalArgumentException(
          "No shell command or shell script specified to be executed by application master");
    }

    if (fileExist(shellCommandPath)) {
      shellCommand = readContent(shellCommandPath);
    }

    if (fileExist(shellArgsPath)) {
      shellArgs = readContent(shellArgsPath);
    }

    if (cliParser.hasOption("shell_env")) {
      String shellEnvs[] = cliParser.getOptionValues("shell_env");
      for (String env : shellEnvs) {
        env = env.trim();
        int index = env.indexOf('=');
        if (index == -1) {
          shellEnv.put(env, "");
          continue;
        }
        String key = env.substring(0, index);
        String val = "";
        if (index < (env.length() - 1)) {
          val = env.substring(index + 1);
        }
        shellEnv.put(key, val);
      }
    }

    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);

      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
        shellScriptPathTimestamp = Long.parseLong(envs
            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
      }
      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
        shellScriptPathLen = Long.parseLong(envs
            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
      }
      if (!scriptPath.isEmpty()
          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
        LOG.error("Illegal values in env for shell script path" + ", path="
            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
            + shellScriptPathTimestamp);
        throw new IllegalArgumentException(
            "Illegal values in env for shell script path");
      }
    }

    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
    }

    if (cliParser.hasOption("container_type")) {
      String containerTypeStr = cliParser.getOptionValue("container_type");
      if (Arrays.stream(ExecutionType.values()).noneMatch(
          executionType -> executionType.toString()
              .equals(containerTypeStr))) {
        throw new IllegalArgumentException("Invalid container_type: "
            + containerTypeStr);
      }
      containerType = ExecutionType.valueOf(containerTypeStr);
    }
    if (cliParser.hasOption("promote_opportunistic_after_start")) {
      autoPromoteContainers = true;
    }
    if (cliParser.hasOption("enforce_execution_type")) {
      enforceExecType = true;
    }
    containerMemory = Integer.parseInt(cliParser.getOptionValue(
        "container_memory", "-1"));
    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
        "container_vcores", "-1"));
    containerResources = new HashMap<>();
    if (cliParser.hasOption("container_resources")) {
      Map<String, Long> resources = Client.parseResourcesString(
          cliParser.getOptionValue("container_resources"));
      for (Map.Entry<String, Long> entry : resources.entrySet()) {
        containerResources.put(entry.getKey(), entry.getValue());
      }
    }
    containerResourceProfile =
        cliParser.getOptionValue("container_resource_profile", "");

    keepContainersAcrossAttempts = cliParser.hasOption(
        "keep_containers_across_application_attempts");

    if (this.placementSpecs == null) {
      numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
          "num_containers", "1"));
    }
    if (numTotalContainers == 0) {
      throw new IllegalArgumentException(
          "Cannot run distributed shell with no containers");
    }
    requestPriority = Integer.parseInt(cliParser
        .getOptionValue("priority", "0"));

    containerRetryPolicy = ContainerRetryPolicy.values()[
        Integer.parseInt(cliParser.getOptionValue(
            "container_retry_policy", "0"))];
    if (cliParser.hasOption("container_retry_error_codes")) {
      containerRetryErrorCodes = new HashSet<>();
      for (String errorCode :
          cliParser.getOptionValue("container_retry_error_codes").split(",")) {
        containerRetryErrorCodes.add(Integer.parseInt(errorCode));
      }
    }
    containerMaxRetries = Integer.parseInt(
        cliParser.getOptionValue("container_max_retries", "0"));
    containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
        "container_retry_interval", "0"));
    containerFailuresValidityInterval = Long.parseLong(
        cliParser.getOptionValue("container_failures_validity_interval", "-1"));
    if (!YarnConfiguration.timelineServiceEnabled(conf)) {
      timelineClient = null;
      timelineV2Client = null;
      LOG.warn("Timeline service is not enabled");
    }

    if (cliParser.hasOption("localized_files")) {
      String localizedFilesArg = cliParser.getOptionValue("localized_files");
      if (localizedFilesArg.contains(",")) {
        String[] files = localizedFilesArg.split(",");
        localizableFiles = Arrays.asList(files);
      } else {
        localizableFiles.add(localizedFilesArg);
      }
    }

    return true;
  }

  private void parsePlacementSpecs(String decodedSpec,
      int globalNumOfContainers) {
    Map<String, PlacementSpec> pSpecs =
        PlacementSpec.parse(decodedSpec);
    this.placementSpecs = new HashMap<>();
    for (PlacementSpec pSpec : pSpecs.values()) {
      // Use global num of containers when the spec doesn't specify
      // source tags. This is allowed when using node-attribute constraints.
      if (Strings.isNullOrEmpty(pSpec.sourceTag)
          && pSpec.getNumContainers() == 0
          && globalNumOfContainers > 0) {
        pSpec.setNumContainers(globalNumOfContainers);
      }
      this.numTotalContainers += pSpec.getNumContainers();
      this.placementSpecs.put(pSpec.sourceTag, pSpec);
    }
  }

  private String getDecodedPlacementSpec(String placementSpecifications) {
    Base64.Decoder decoder = Base64.getDecoder();
    byte[] decodedBytes = decoder.decode(
        placementSpecifications.getBytes(StandardCharsets.UTF_8));
    String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
    LOG.info("Decode placement spec: " + decodedSpec);
    return decodedSpec;
  }

  /**
   * Helper function to print usage
   *
   * @param opts Parsed command line options
   */
  private void printUsage(Options opts) {
    new HelpFormatter().printHelp("ApplicationMaster", opts);
  }

  protected void cleanup() {
    try {
      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws IOException {
          FileSystem fs = FileSystem.get(conf);
          Path dst = new Path(homeDirectory,
              getRelativePath(appName, appId.toString(), ""));
          fs.delete(dst, true);
          return null;
        }
      });
    } catch(Exception e) {
      LOG.warn("Failed to remove application staging directory", e);
    }
  }

  /**
   * Main run function for the application master
   *
   * @throws YarnException
   * @throws IOException
   */
  @SuppressWarnings({ "unchecked" })
  public void run() throws YarnException, IOException, InterruptedException {
    LOG.info("Starting ApplicationMaster");

    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
    // are marked as LimitedPrivate
    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    // Now remove the AM->RM token so that containers cannot access it.
    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
    LOG.info("Executing with tokens:");
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      LOG.info(token.toString());
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    // Create appSubmitterUgi and add original tokens to it
    String appSubmitterUserName =
        System.getenv(ApplicationConstants.Environment.USER.name());
    appSubmitterUgi =
        UserGroupInformation.createRemoteUser(appSubmitterUserName);
    appSubmitterUgi.addCredentials(credentials);

    AMRMClientAsync.AbstractCallbackHandler allocListener =
        new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();

    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(conf);
    nmClientAsync.start();

    startTimelineClient(conf);
    if (timelineServiceV2Enabled) {
      // need to bind timelineClient
      amRMClient.registerTimelineV2Client(timelineV2Client);
      publishApplicationAttemptEventOnTimelineServiceV2(
          DSEvent.DS_APP_ATTEMPT_START);
    }

    if (timelineServiceV1Enabled) {
      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
    }

    // Setup local RPC Server to accept status requests directly from clients
    // TODO need to setup a protocol for client to be able to communicate to
    // the RPC server
    // TODO use the rpc port info to register with the RM for the client to
    // send requests to this app master

    // Register self with ResourceManager
    // This will start heartbeating to the RM
    appMasterHostname = NetUtils.getHostname();
    Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
    if (this.placementSpecs != null) {
      placementConstraintMap = new HashMap<>();
      for (PlacementSpec spec : this.placementSpecs.values()) {
        if (spec.constraint != null) {
          Set<String> allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ?
              Collections.emptySet() : Collections.singleton(spec.sourceTag);
          placementConstraintMap.put(allocationTags, spec.constraint);
        }
      }
    }

    RegisterApplicationMasterResponse response = amRMClient
        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
            appMasterTrackingUrl, placementConstraintMap);
    resourceProfiles = response.getResourceProfiles();
    ResourceUtils.reinitializeResources(response.getResourceTypes());
    // Dump out information about cluster capability as seen by the
    // resource manager
    long maxMem = response.getMaximumResourceCapability().getMemorySize();
    LOG.info("Max mem capability of resources in this cluster " + maxMem);
    
    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerVirtualCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
          + maxVCores);
      containerVirtualCores = maxVCores;
    }

    List<Container> previousAMRunningContainers =
        response.getContainersFromPreviousAttempts();
    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
      + " previous attempts' running containers on AM registration.");
    for(Container container: previousAMRunningContainers) {
      launchedContainers.add(container.getId());
    }
    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());


    int numTotalContainersToRequest =
        numTotalContainers - previousAMRunningContainers.size();
    // Setup ask for containers from RM
    // Send request for containers to RM
    // Until we get our fully allocated quota, we keep on polling RM for
    // containers
    // Keep looping until all the containers are launched and shell script
    // executed on them ( regardless of success/failure).
    if (this.placementSpecs == null) {
      LOG.info("placementSpecs null");
      for (int i = 0; i < numTotalContainersToRequest; ++i) {
        ContainerRequest containerAsk = setupContainerAskForRM();
        amRMClient.addContainerRequest(containerAsk);
      }
    } else {
      LOG.info("placementSpecs to create req:" + placementSpecs);
      List<SchedulingRequest> schedReqs = new ArrayList<>();
      for (PlacementSpec pSpec : this.placementSpecs.values()) {
        LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
            .getNumContainers());
        for (int i = 0; i < pSpec.getNumContainers(); i++) {
          SchedulingRequest sr = setupSchedulingRequest(pSpec);
          schedReqs.add(sr);
        }
      }
      amRMClient.addSchedulingRequests(schedReqs);
    }
    numRequestedContainers.set(numTotalContainers);
  }

  @VisibleForTesting
  void startTimelineClient(final Configuration conf)
      throws YarnException, IOException, InterruptedException {
    try {
      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          if (YarnConfiguration.timelineServiceEnabled(conf)) {
            timelineServiceV1Enabled =
                YarnConfiguration.timelineServiceV1Enabled(conf);
            timelineServiceV2Enabled =
                YarnConfiguration.timelineServiceV2Enabled(conf);
            // Creating the Timeline Client
            if (timelineServiceV1Enabled) {
              timelineClient = TimelineClient.createTimelineClient();
              timelineClient.init(conf);
              timelineClient.start();
              LOG.info("Timeline service V1 client is enabled");
            }
            if (timelineServiceV2Enabled) {
              timelineV2Client = TimelineV2Client.createTimelineClient(
                  appAttemptID.getApplicationId());
              timelineV2Client.init(conf);
              timelineV2Client.start();
              LOG.info("Timeline service V2 client is enabled");
            }
          } else {
            timelineClient = null;
            timelineV2Client = null;
            LOG.warn("Timeline service is not enabled");
          }
          return null;
        }
      });
    } catch (UndeclaredThrowableException e) {
      throw new YarnException(e.getCause());
    }
  }

  @VisibleForTesting
  NMCallbackHandler createNMCallbackHandler() {
    return new NMCallbackHandler(this);
  }

  @VisibleForTesting
  protected boolean finish() {
    // wait for completion.
    while (!done
        && (numCompletedContainers.get() != numTotalContainers)) {
      try {
        Thread.sleep(200);
      } catch (InterruptedException ex) {}
    }

    if (timelineServiceV1Enabled) {
      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
    }

    if (timelineServiceV2Enabled) {
      publishApplicationAttemptEventOnTimelineServiceV2(
          DSEvent.DS_APP_ATTEMPT_END);
    }

    // Join all launched threads
    // needed for when we time out
    // and we need to release containers
    for (Thread launchThread : launchThreads) {
      try {
        launchThread.join(10000);
      } catch (InterruptedException e) {
        LOG.info("Exception thrown in thread join: " + e.getMessage());
        e.printStackTrace();
      }
    }

    // When the application completes, it should stop all running containers
    LOG.info("Application completed. Stopping running containers");
    nmClientAsync.stop();

    // When the application completes, it should send a finish application
    // signal to the RM
    LOG.info("Application completed. Signalling finished to RM");

    FinalApplicationStatus appStatus;
    boolean success = true;
    String message = null;
    if (numCompletedContainers.get() - numFailedContainers.get()
        >= numTotalContainers) {
      appStatus = FinalApplicationStatus.SUCCEEDED;
    } else {
      appStatus = FinalApplicationStatus.FAILED;
      message = String.format("Application Failure: desired = %d, " +
              "completed = %d, allocated = %d, failed = %d, " +
              "diagnostics = %s", numRequestedContainers.get(),
          numCompletedContainers.get(), numAllocatedContainers.get(),
          numFailedContainers.get(), diagnostics);
      success = false;
    }
    try {
      amRMClient.unregisterApplicationMaster(appStatus, message, null);
    } catch (YarnException | IOException ex) {
      LOG.error("Failed to unregister application", ex);
    }
    amRMClient.stop();

    // Stop Timeline Client
    if(timelineServiceV1Enabled) {
      timelineClient.stop();
    }
    if (timelineServiceV2Enabled) {
      timelineV2Client.stop();
    }

    return success;
  }

  public static String getRelativePath(String appName,
      String appId, String fileDstPath) {
    return appName + "/" + appId + "/" + fileDstPath;
  }

  @VisibleForTesting
  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
    @SuppressWarnings("unchecked")
    @Override
    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
      LOG.info("Got response from RM for container ask, completedCnt="
          + completedContainers.size());
      for (ContainerStatus containerStatus : completedContainers) {
        String message = appAttemptID + " got container status for containerID="
            + containerStatus.getContainerId() + ", state="
            + containerStatus.getState() + ", exitStatus="
            + containerStatus.getExitStatus() + ", diagnostics="
            + containerStatus.getDiagnostics();
        if (containerStatus.getExitStatus() != 0) {
          LOG.error(message);
          diagnostics.append(containerStatus.getDiagnostics());
        } else {
          LOG.info(message);
        }

        // non complete containers should not be here
        assert (containerStatus.getState() == ContainerState.COMPLETE);
        // ignore containers we know nothing about - probably from a previous
        // attempt
        if (!launchedContainers.contains(containerStatus.getContainerId())) {
          LOG.info("Ignoring completed status of "
              + containerStatus.getContainerId()
              + "; unknown container(probably launched by previous attempt)");
          continue;
        }

        // increment counters for completed/failed containers
        int exitStatus = containerStatus.getExitStatus();
        if (0 != exitStatus) {
          // container failed
          if (ContainerExitStatus.ABORTED != exitStatus) {
            // shell script failed
            // counts as completed
            numCompletedContainers.incrementAndGet();
            numFailedContainers.incrementAndGet();
          } else {
            // container was killed by framework, possibly preempted
            // we should re-try as the container was lost for some reason
            numAllocatedContainers.decrementAndGet();
            numRequestedContainers.decrementAndGet();
            // we do not need to release the container as it would be done
            // by the RM

            // Ignore these containers if placementspec is enabled
            // for the time being.
            if (placementSpecs != null) {
              numIgnore.incrementAndGet();
            }
          }
        } else {
          // nothing to do
          // container completed successfully
          numCompletedContainers.incrementAndGet();
          LOG.info("Container completed successfully." + ", containerId="
              + containerStatus.getContainerId());
        }
        if (timelineServiceV2Enabled) {
          Long containerStartTime =
              containerStartTimes.get(containerStatus.getContainerId());
          if (containerStartTime == null) {
            containerStartTime = SystemClock.getInstance().getTime();
            containerStartTimes.put(containerStatus.getContainerId(),
                containerStartTime);
          }
          publishContainerEndEventOnTimelineServiceV2(containerStatus,
              containerStartTime);
        }
        if (timelineServiceV1Enabled) {
          publishContainerEndEvent(timelineClient, containerStatus, domainId,
              appSubmitterUgi);
        }
      }

      // ask for more containers if any failed
      int askCount = numTotalContainers - numRequestedContainers.get();
      numRequestedContainers.addAndGet(askCount);

      // Dont bother re-asking if we are using placementSpecs
      if (placementSpecs == null) {
        if (askCount > 0) {
          for (int i = 0; i < askCount; ++i) {
            ContainerRequest containerAsk = setupContainerAskForRM();
            amRMClient.addContainerRequest(containerAsk);
          }
        }
      }

      if (numCompletedContainers.get() + numIgnore.get() >=
          numTotalContainers) {
        done = true;
      }
    }

    @Override
    public void onContainersAllocated(List<Container> allocatedContainers) {
      LOG.info("Got response from RM for container ask, allocatedCnt="
          + allocatedContainers.size());
      for (Container allocatedContainer : allocatedContainers) {
        if (numAllocatedContainers.get() == numTotalContainers) {
          LOG.info("The requested number of containers have been allocated."
              + " Releasing the extra container allocation from the RM.");
          amRMClient.releaseAssignedContainer(allocatedContainer.getId());
        } else {
          numAllocatedContainers.addAndGet(1);
          String yarnShellId = Integer.toString(yarnShellIdCounter);
          yarnShellIdCounter++;
          LOG.info(
              "Launching shell command on a new container."
                  + ", containerId=" + allocatedContainer.getId()
                  + ", yarnShellId=" + yarnShellId
                  + ", containerNode="
                  + allocatedContainer.getNodeId().getHost()
                  + ":" + allocatedContainer.getNodeId().getPort()
                  + ", containerNodeURI="
                  + allocatedContainer.getNodeHttpAddress()
                  + ", containerResourceMemory"
                  + allocatedContainer.getResource().getMemorySize()
                  + ", containerResourceVirtualCores"
                  + allocatedContainer.getResource().getVirtualCores());

          Thread launchThread =
              createLaunchContainerThread(allocatedContainer, yarnShellId);

          // launch and start the container on a separate thread to keep
          // the main thread unblocked
          // as all containers may not be allocated at one go.
          launchThreads.add(launchThread);
          launchedContainers.add(allocatedContainer.getId());
          launchThread.start();

          // Remove the corresponding request
          Collection<AMRMClient.ContainerRequest> requests =
              amRMClient.getMatchingRequests(
                  allocatedContainer.getAllocationRequestId());
          if (requests.iterator().hasNext()) {
            AMRMClient.ContainerRequest request = requests.iterator().next();
            amRMClient.removeContainerRequest(request);
          }
        }
      }
    }

    @Override
    public void onContainersUpdated(
        List<UpdatedContainer> containers) {
      for (UpdatedContainer container : containers) {
        LOG.info("Container {} updated, updateType={}, resource={}, "
                + "execType={}",
            container.getContainer().getId(),
            container.getUpdateType().toString(),
            container.getContainer().getResource().toString(),
            container.getContainer().getExecutionType());

        // TODO Remove this line with finalized updateContainer API.
        // Currently nm client needs to notify the NM to update container
        // execution type via NMClient#updateContainerResource() or
        // NMClientAsync#updateContainerResourceAsync() when
        // auto-update.containers is disabled, but this API is
        // under evolving and will need to be replaced by a proper new API.
        nmClientAsync.updateContainerResourceAsync(container.getContainer());
      }
    }

    @Override
    public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) {
      List<SchedulingRequest> reqsToRetry = new ArrayList<>();
      for (RejectedSchedulingRequest rejReq : rejReqs) {
        LOG.info("Scheduling Request {} has been rejected. Reason {}",
            rejReq.getRequest(), rejReq.getReason());
        reqsToRetry.add(rejReq.getRequest());
      }
      totalRetries.addAndGet(-1 * reqsToRetry.size());
      if (totalRetries.get() <= 0) {
        LOG.info("Exiting, since retries are exhausted !!");
        done = true;
      } else {
        amRMClient.addSchedulingRequests(reqsToRetry);
      }
    }

    @Override public void onShutdownRequest() {
      if (keepContainersAcrossAttempts) {
        LOG.info("Shutdown request received. Ignoring since "
            + "keep_containers_across_application_attempts is enabled");
      } else{
        LOG.info("Shutdown request received. Processing since "
            + "keep_containers_across_application_attempts is disabled");
        done = true;
      }
    }

    @Override
    public void onNodesUpdated(List<NodeReport> updatedNodes) {}

    @Override
    public float getProgress() {
      // set progress to deliver to RM on next heartbeat
      float progress = (float) numCompletedContainers.get()
          / numTotalContainers;
      return progress;
    }

    @Override
    public void onError(Throwable e) {
      LOG.error("Error in RMCallbackHandler: ", e);
      done = true;
    }
  }

  @VisibleForTesting
  class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {

    private ConcurrentMap<ContainerId, Container> containers =
        new ConcurrentHashMap<ContainerId, Container>();
    private final ApplicationMaster applicationMaster;

    public NMCallbackHandler(ApplicationMaster applicationMaster) {
      this.applicationMaster = applicationMaster;
    }

    public void addContainer(ContainerId containerId, Container container) {
      containers.putIfAbsent(containerId, container);
    }

    @Override
    public void onContainerStopped(ContainerId containerId) {
      LOG.debug("Succeeded to stop Container {}", containerId);
      containers.remove(containerId);
    }

    @Override
    public void onContainerStatusReceived(ContainerId containerId,
        ContainerStatus containerStatus) {
      LOG.debug("Container Status: id={}, status={}", containerId,
          containerStatus);

      // If promote_opportunistic_after_start is set, automatically promote
      // opportunistic containers to guaranteed.
      if (autoPromoteContainers) {
        if (containerStatus.getState() == ContainerState.RUNNING) {
          Container container = containers.get(containerId);
          if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
            // Promote container
            LOG.info("Promoting container {} to {}", container.getId(),
                container.getExecutionType());
            UpdateContainerRequest updateRequest = UpdateContainerRequest
                .newInstance(container.getVersion(), container.getId(),
                    ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
                    ExecutionType.GUARANTEED);
            amRMClient.requestContainerUpdate(container, updateRequest);
          }
        }
      }
    }

    @Override
    public void onContainerStarted(ContainerId containerId,
        Map<String, ByteBuffer> allServiceResponse) {
      LOG.debug("Succeeded to start Container {}", containerId);
      Container container = containers.get(containerId);
      if (container != null) {
        applicationMaster.nmClientAsync.getContainerStatusAsync(
            containerId, container.getNodeId());
      }
      if (applicationMaster.timelineServiceV2Enabled) {
        long startTime = SystemClock.getInstance().getTime();
        applicationMaster.getContainerStartTimes().put(containerId, startTime);
        applicationMaster.publishContainerStartEventOnTimelineServiceV2(
            container, startTime);
      }
      if (applicationMaster.timelineServiceV1Enabled) {
        applicationMaster.publishContainerStartEvent(
            applicationMaster.timelineClient, container,
            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
      }
    }

    @Override
    public void onStartContainerError(ContainerId containerId, Throwable t) {
      LOG.error("Failed to start Container {}", containerId, t);
      containers.remove(containerId);
      applicationMaster.numCompletedContainers.incrementAndGet();
      applicationMaster.numFailedContainers.incrementAndGet();
      if (timelineServiceV2Enabled) {
        publishContainerStartFailedEventOnTimelineServiceV2(containerId,
            t.getMessage());
      }
      if (timelineServiceV1Enabled) {
        publishContainerStartFailedEvent(containerId, t.getMessage());
      }
    }

    @Override
    public void onGetContainerStatusError(
        ContainerId containerId, Throwable t) {
      LOG.error("Failed to query the status of Container " + containerId);
    }

    @Override
    public void onStopContainerError(ContainerId containerId, Throwable t) {
      LOG.error("Failed to stop Container " + containerId);
      containers.remove(containerId);
    }

    @Deprecated
    @Override
    public void onIncreaseContainerResourceError(
        ContainerId containerId, Throwable t) {}

    @Deprecated
    @Override
    public void onContainerResourceIncreased(
        ContainerId containerId, Resource resource) {}

    @Override
    public void onUpdateContainerResourceError(
        ContainerId containerId, Throwable t) {
    }

    @Override
    public void onContainerResourceUpdated(ContainerId containerId,
        Resource resource) {
    }
  }

  /**
   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
   * that will execute the shell command.
   */
  private class LaunchContainerRunnable implements Runnable {

    // Allocated container
    private Container container;
    private String shellId;

    NMCallbackHandler containerListener;

    /**
     * @param lcontainer Allocated container
     * @param containerListener Callback handler of the container
     */
    public LaunchContainerRunnable(Container lcontainer,
        NMCallbackHandler containerListener, String shellId) {
      this.container = lcontainer;
      this.containerListener = containerListener;
      this.shellId = shellId;
    }

    @Override
    /**
     * Connects to CM, sets up container launch context 
     * for shell command and eventually dispatches the container 
     * start request to the CM. 
     */
    public void run() {
      LOG.info("Setting up container launch container for containerid="
          + container.getId() + " with shellid=" + shellId);

      // Set the local resources
      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

      // The container for the eventual shell commands needs its own local
      // resources too.
      // In this scenario, if a shell script is specified, we need to have it
      // copied and made available to the container.
      if (!scriptPath.isEmpty()) {
        Path renamedScriptPath = null;
        if (Shell.WINDOWS) {
          renamedScriptPath = new Path(scriptPath + ".bat");
        } else {
          renamedScriptPath = new Path(scriptPath + ".sh");
        }

        try {
          // rename the script file based on the underlying OS syntax.
          renameScriptFile(renamedScriptPath);
        } catch (Exception e) {
          LOG.error(
              "Not able to add suffix (.bat/.sh) to the shell script filename",
              e);
          // We know we cannot continue launching the container
          // so we should release it.
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();
          return;
        }

        URL yarnUrl = null;
        try {
          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
        } catch (URISyntaxException e) {
          LOG.error("Error when trying to use shell script path specified"
              + " in env, path=" + renamedScriptPath, e);
          // A failure scenario on bad input such as invalid shell script path
          // We know we cannot continue launching the container
          // so we should release it.
          // TODO
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();
          return;
        }
        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
          shellScriptPathLen, shellScriptPathTimestamp);
        localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
            EXEC_SHELL_STRING_PATH, shellRsrc);
        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
      }

      // Set up localization for the container which runs the command
      if (localizableFiles.size() > 0) {
        FileSystem fs;
        try {
          fs = FileSystem.get(conf);
        } catch (IOException e) {
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();
          throw new UncheckedIOException("Cannot get FileSystem", e);
        }

        localizableFiles.stream().forEach(fileName -> {
          try {
            String relativePath =
                getRelativePath(appName, appId.toString(), fileName);
            Path dst =
                new Path(homeDirectory, relativePath);
            FileStatus fileStatus = fs.getFileStatus(dst);
            LocalResource localRes = LocalResource.newInstance(
                URL.fromURI(dst.toUri()),
                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                fileStatus.getLen(), fileStatus.getModificationTime());
            LOG.info("Setting up file for localization: " + dst);
            localResources.put(fileName, localRes);
          } catch (IOException e) {
            numCompletedContainers.incrementAndGet();
            numFailedContainers.incrementAndGet();
            throw new UncheckedIOException(
                "Error during localization setup", e);
          }
        });
      }

      // Set the necessary command to execute on the allocated container
      Vector<CharSequence> vargs = new Vector<CharSequence>(5);

      // Set executable command
      vargs.add(shellCommand);
      // Set shell script path
      if (!scriptPath.isEmpty()) {
        vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
            : EXEC_SHELL_STRING_PATH);
      }

      // Set args for the shell command if any
      vargs.add(shellArgs);
      // Add log redirect params
      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

      // Get final commmand
      StringBuilder command = new StringBuilder();
      for (CharSequence str : vargs) {
        command.append(str).append(" ");
      }

      List<String> commands = new ArrayList<String>();
      commands.add(command.toString());

      // Set up ContainerLaunchContext, setting local resource, environment,
      // command and token for constructor.

      // Note for tokens: Set up tokens for the container too. Today, for normal
      // shell commands, the container in distribute-shell doesn't need any
      // tokens. We are populating them mainly for NodeManagers to be able to
      // download anyfiles in the distributed file-system. The tokens are
      // otherwise also useful in cases, for e.g., when one is running a
      // "hadoop dfs" command inside the distributed shell.
      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
      myShellEnv.put(YARN_SHELL_ID, shellId);
      ContainerRetryContext containerRetryContext =
          ContainerRetryContext.newInstance(
              containerRetryPolicy, containerRetryErrorCodes,
              containerMaxRetries, containrRetryInterval,
              containerFailuresValidityInterval);
      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
        localResources, myShellEnv, commands, null, allTokens.duplicate(),
          null, containerRetryContext);
      containerListener.addContainer(container.getId(), container);
      nmClientAsync.startContainerAsync(container, ctx);
    }
  }

  private void renameScriptFile(final Path renamedScriptPath)
      throws IOException, InterruptedException {
    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws IOException {
        FileSystem fs = renamedScriptPath.getFileSystem(conf);
        fs.rename(new Path(scriptPath), renamedScriptPath);
        return null;
      }
    });
    LOG.info("User " + appSubmitterUgi.getUserName()
        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
  }

  /**
   * Setup the request that will be sent to the RM for the container ask.
   *
   * @return the setup ResourceRequest to be sent to RM
   */
  private ContainerRequest setupContainerAskForRM() {
    // setup requirements for hosts
    // using * as any host will do for the distributed shell app
    // set the priority for the request
    // TODO - what is the range for priority? how to decide?
    Priority pri = Priority.newInstance(requestPriority);

    // Set up resource type requirements
    ContainerRequest request = new ContainerRequest(
        getTaskResourceCapability(),
        null, null, pri, 0, true, null,
        ExecutionTypeRequest.newInstance(containerType, enforceExecType),
        containerResourceProfile);
    LOG.info("Requested container ask: " + request.toString());
    return request;
  }

  private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) {
    long allocId = allocIdCounter.incrementAndGet();
    SchedulingRequest sReq = SchedulingRequest.newInstance(
        allocId, Priority.newInstance(requestPriority),
        ExecutionTypeRequest.newInstance(),
        Collections.singleton(spec.sourceTag),
        ResourceSizing.newInstance(
            getTaskResourceCapability()), null);
    sReq.setPlacementConstraint(spec.constraint);
    LOG.info("Scheduling Request made: " + sReq.toString());
    return sReq;
  }

  private boolean fileExist(String filePath) {
    return new File(filePath).exists();
  }

  private String readContent(String filePath) throws IOException {
    try (DataInputStream ds = new DataInputStream(
        new FileInputStream(filePath))) {
      return ds.readUTF();
    }
  }

  private void publishContainerStartEvent(
      final TimelineClient timelineClient, final Container container,
      String domainId, UserGroupInformation ugi) {
    final TimelineEntity entity = new TimelineEntity();
    entity.setEntityId(container.getId().toString());
    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
    entity.setDomainId(domainId);
    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
        .getApplicationAttemptId().getApplicationId().toString());
    TimelineEvent event = new TimelineEvent();
    event.setTimestamp(System.currentTimeMillis());
    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
    event.addEventInfo("Node", container.getNodeId().toString());
    event.addEventInfo("Resources", container.getResource().toString());
    entity.addEvent(event);

    try {
      processTimelineResponseErrors(
          putContainerEntity(timelineClient,
              container.getId().getApplicationAttemptId(),
              entity));
    } catch (YarnException | IOException | ClientHandlerException e) {
      LOG.error("Container start event could not be published for "
          + container.getId().toString(), e);
    }
  }

  @VisibleForTesting
  void publishContainerEndEvent(
      final TimelineClient timelineClient, ContainerStatus container,
      String domainId, UserGroupInformation ugi) {
    final TimelineEntity entity = new TimelineEntity();
    entity.setEntityId(container.getContainerId().toString());
    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
    entity.setDomainId(domainId);
    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
        container.getContainerId().getApplicationAttemptId()
            .getApplicationId().toString());
    TimelineEvent event = new TimelineEvent();
    event.setTimestamp(System.currentTimeMillis());
    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
    event.addEventInfo("State", container.getState().name());
    event.addEventInfo("Exit Status", container.getExitStatus());
    event.addEventInfo(DIAGNOSTICS, container.getDiagnostics());
    entity.addEvent(event);
    try {
      processTimelineResponseErrors(
          putContainerEntity(timelineClient,
              container.getContainerId().getApplicationAttemptId(),
              entity));
    } catch (YarnException | IOException | ClientHandlerException e) {
      LOG.error("Container end event could not be published for "
          + container.getContainerId().toString(), e);
    }
  }

  private TimelinePutResponse putContainerEntity(
      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
      TimelineEntity entity)
      throws YarnException, IOException {
    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
          currAttemptId.getApplicationId(),
          CONTAINER_ENTITY_GROUP_ID);
      return timelineClient.putEntities(currAttemptId, groupId, entity);
    } else {
      return timelineClient.putEntities(entity);
    }
  }

  private void publishApplicationAttemptEvent(
      final TimelineClient timelineClient, String appAttemptId,
      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
    final TimelineEntity entity = new TimelineEntity();
    entity.setEntityId(appAttemptId);
    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
    entity.setDomainId(domainId);
    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
    TimelineEvent event = new TimelineEvent();
    event.setEventType(appEvent.toString());
    event.setTimestamp(System.currentTimeMillis());
    entity.addEvent(event);
    try {
      TimelinePutResponse response = timelineClient.putEntities(entity);
      processTimelineResponseErrors(response);
    } catch (YarnException | IOException | ClientHandlerException e) {
      LOG.error("App Attempt "
          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
          + " event could not be published for "
          + appAttemptID, e);
    }
  }

  private TimelinePutResponse processTimelineResponseErrors(
      TimelinePutResponse response) {
    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
    if (errors.size() == 0) {
      LOG.debug("Timeline entities are successfully put");
    } else {
      for (TimelinePutResponse.TimelinePutError error : errors) {
        LOG.error(
            "Error when publishing entity [" + error.getEntityType() + ","
                + error.getEntityId() + "], server side error code: "
                + error.getErrorCode());
      }
    }
    return response;
  }

  RMCallbackHandler getRMCallbackHandler() {
    return new RMCallbackHandler();
  }

  @VisibleForTesting
  void setAmRMClient(AMRMClientAsync client) {
    this.amRMClient = client;
  }

  @VisibleForTesting
  int getNumCompletedContainers() {
    return numCompletedContainers.get();
  }

  @VisibleForTesting
  boolean getDone() {
    return done;
  }

  @VisibleForTesting
  Thread createLaunchContainerThread(Container allocatedContainer,
      String shellId) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener,
            shellId);
    return new Thread(runnableLaunchContainer);
  }

  private void publishContainerStartEventOnTimelineServiceV2(
      Container container, long startTime) {
    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
        entity =
            new org.apache.hadoop.yarn.api.records.timelineservice.
            TimelineEntity();
    entity.setId(container.getId().toString());
    entity.setType(DSEntity.DS_CONTAINER.toString());
    entity.setCreatedTime(startTime);
    entity.addInfo("user", appSubmitterUgi.getShortUserName());

    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
    event.setTimestamp(startTime);
    event.setId(DSEvent.DS_CONTAINER_START.toString());
    event.addInfo("Node", container.getNodeId().toString());
    event.addInfo("Resources", container.getResource().toString());
    entity.addEvent(event);
    entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));

    try {
      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public TimelinePutResponse run() throws Exception {
          timelineV2Client.putEntitiesAsync(entity);
          return null;
        }
      });
    } catch (Exception e) {
      LOG.error("Container start event could not be published for "
          + container.getId().toString(),
          e instanceof UndeclaredThrowableException ? e.getCause() : e);
    }
  }

  private void publishContainerStartFailedEventOnTimelineServiceV2(
      final ContainerId containerId, String diagnostics) {
    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
        entity = new org.apache.hadoop.yarn.api.records.timelineservice.
        TimelineEntity();
    entity.setId(containerId.toString());
    entity.setType(DSEntity.DS_CONTAINER.toString());
    entity.addInfo("user", appSubmitterUgi.getShortUserName());
    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
        new org.apache.hadoop.yarn.api.records.timelineservice
            .TimelineEvent();
    event.setTimestamp(System.currentTimeMillis());
    event.setId(DSEvent.DS_CONTAINER_END.toString());
    event.addInfo(DIAGNOSTICS, diagnostics);
    entity.addEvent(event);
    try {
      appSubmitterUgi.doAs((PrivilegedExceptionAction<Object>) () -> {
        timelineV2Client.putEntitiesAsync(entity);
        return null;
      });
    } catch (Exception e) {
      LOG.error("Container start failed event could not be published for {}",
          containerId,
          e instanceof UndeclaredThrowableException ? e.getCause() : e);
    }
  }

  private void publishContainerStartFailedEvent(final ContainerId containerId,
      String diagnostics) {
    final TimelineEntity entityV1 = new TimelineEntity();
    entityV1.setEntityId(containerId.toString());
    entityV1.setEntityType(DSEntity.DS_CONTAINER.toString());
    entityV1.setDomainId(domainId);
    entityV1.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, appSubmitterUgi
        .getShortUserName());
    entityV1.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
        containerId.getApplicationAttemptId().getApplicationId().toString());

    TimelineEvent eventV1 = new TimelineEvent();
    eventV1.setTimestamp(System.currentTimeMillis());
    eventV1.setEventType(DSEvent.DS_CONTAINER_END.toString());
    eventV1.addEventInfo(DIAGNOSTICS, diagnostics);
    entityV1.addEvent(eventV1);
    try {
      processTimelineResponseErrors(putContainerEntity(timelineClient,
          containerId.getApplicationAttemptId(), entityV1));
    } catch (YarnException | IOException | ClientHandlerException e) {
      LOG.error("Container end event could not be published for {}",
          containerId, e);
    }
  }

  private void publishContainerEndEventOnTimelineServiceV2(
      final ContainerStatus container, long containerStartTime) {
    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
        entity =
            new org.apache.hadoop.yarn.api.records.timelineservice.
            TimelineEntity();
    entity.setId(container.getContainerId().toString());
    entity.setType(DSEntity.DS_CONTAINER.toString());
    //entity.setDomainId(domainId);
    entity.addInfo("user", appSubmitterUgi.getShortUserName());
    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
    event.setTimestamp(System.currentTimeMillis());
    event.setId(DSEvent.DS_CONTAINER_END.toString());
    event.addInfo("State", container.getState().name());
    event.addInfo("Exit Status", container.getExitStatus());
    event.addInfo(DIAGNOSTICS, container.getDiagnostics());
    entity.addEvent(event);
    entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));

    try {
      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public TimelinePutResponse run() throws Exception {
          timelineV2Client.putEntitiesAsync(entity);
          return null;
        }
      });
    } catch (Exception e) {
      LOG.error("Container end event could not be published for "
          + container.getContainerId().toString(),
          e instanceof UndeclaredThrowableException ? e.getCause() : e);
    }
  }

  private void publishApplicationAttemptEventOnTimelineServiceV2(
      DSEvent appEvent) {
    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
        entity =
            new org.apache.hadoop.yarn.api.records.timelineservice.
            TimelineEntity();
    entity.setId(appAttemptID.toString());
    entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
    long ts = System.currentTimeMillis();
    if (appEvent == DSEvent.DS_APP_ATTEMPT_START) {
      entity.setCreatedTime(ts);
    }
    entity.addInfo("user", appSubmitterUgi.getShortUserName());
    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
    event.setId(appEvent.toString());
    event.setTimestamp(ts);
    entity.addEvent(event);
    entity.setIdPrefix(
        TimelineServiceHelper.invertLong(appAttemptID.getAttemptId()));

    try {
      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public TimelinePutResponse run() throws Exception {
          timelineV2Client.putEntitiesAsync(entity);
          return null;
        }
      });
    } catch (Exception e) {
      LOG.error("App Attempt "
          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
          + " event could not be published for "
          + appAttemptID,
          e instanceof UndeclaredThrowableException ? e.getCause() : e);
    }
  }

  private Resource getTaskResourceCapability()
      throws YarnRuntimeException {
    if (containerMemory < -1 || containerMemory == 0) {
      throw new YarnRuntimeException("Value of AM memory '" + containerMemory
          + "' has to be greater than 0");
    }
    if (containerVirtualCores < -1 || containerVirtualCores == 0) {
      throw new YarnRuntimeException(
          "Value of AM vcores '" + containerVirtualCores
              + "' has to be greater than 0");
    }

    Resource resourceCapability =
        Resource.newInstance(containerMemory, containerVirtualCores);
    containerMemory =
        containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY : containerMemory;
    containerVirtualCores = containerVirtualCores == -1 ?
        DEFAULT_CONTAINER_VCORES :
        containerVirtualCores;
    resourceCapability.setMemorySize(containerMemory);
    resourceCapability.setVirtualCores(containerVirtualCores);
    for (Map.Entry<String, Long> entry : containerResources.entrySet()) {
      resourceCapability.setResourceValue(entry.getKey(), entry.getValue());
    }

    return resourceCapability;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Client 源码

hadoop DSConstants 源码

hadoop DistributedShellTimelinePlugin 源码

hadoop Log4jPropertyHelper 源码

hadoop PlacementSpec 源码

hadoop package-info 源码

0  赞