hadoop JobResourceUploader 源码

  • 2022-10-20
  • 浏览 (107)

haddop JobResourceUploader 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * This class is responsible for uploading resources from the client to HDFS
 * that are associated with a MapReduce job.
 */
@Private
@Unstable
class JobResourceUploader {
  protected static final Logger LOG =
      LoggerFactory.getLogger(JobResourceUploader.class);
  private static final String ROOT_PATH = "/";

  private final boolean useWildcard;
  private final FileSystem jtFs;
  private SharedCacheClient scClient = null;
  private SharedCacheConfig scConfig = new SharedCacheConfig();
  private ApplicationId appId = null;

  JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
    this.jtFs = submitFs;
    this.useWildcard = useWildcard;
  }

  private void initSharedCache(JobID jobid, Configuration conf) {
    this.scConfig.init(conf);
    if (this.scConfig.isSharedCacheEnabled()) {
      this.scClient = createSharedCacheClient(conf);
      appId = jobIDToAppId(jobid);
    }
  }

  /*
   * We added this method so that we could do the conversion between JobId and
   * ApplicationId for the shared cache client. This logic is very similar to
   * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use
   * that because mapreduce-client-core can not depend on
   * mapreduce-client-common.
   */
  private ApplicationId jobIDToAppId(JobID jobId) {
    return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()),
        jobId.getId());
  }

  private void stopSharedCache() {
    if (scClient != null) {
      scClient.stop();
      scClient = null;
    }
  }

  /**
   * Create, initialize and start a new shared cache client.
   */
  @VisibleForTesting
  protected SharedCacheClient createSharedCacheClient(Configuration conf) {
    SharedCacheClient scc = SharedCacheClient.createSharedCacheClient();
    scc.init(conf);
    scc.start();
    return scc;
  }

  /**
   * Upload and configure files, libjars, jobjars, and archives pertaining to
   * the passed job.
   * <p>
   * This client will use the shared cache for libjars, files, archives and
   * jobjars if it is enabled. When shared cache is enabled, it will try to use
   * the shared cache and fall back to the default behavior when the scm isn't
   * available.
   * <p>
   * 1. For the resources that have been successfully shared, we will continue
   * to use them in a shared fashion.
   * <p>
   * 2. For the resources that weren't in the cache and need to be uploaded by
   * NM, we won't ask NM to upload them.
   *
   * @param job the job containing the files to be uploaded
   * @param submitJobDir the submission directory of the job
   * @throws IOException
   */
  public void uploadResources(Job job, Path submitJobDir) throws IOException {
    try {
      initSharedCache(job.getJobID(), job.getConfiguration());
      uploadResourcesInternal(job, submitJobDir);
    } finally {
      stopSharedCache();
    }
  }

  private void uploadResourcesInternal(Job job, Path submitJobDir)
      throws IOException {
    Configuration conf = job.getConfiguration();
    short replication =
        (short) conf.getInt(Job.SUBMIT_REPLICATION,
            Job.DEFAULT_SUBMIT_REPLICATION);

    if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
      LOG.warn("Hadoop command-line option parsing not performed. "
          + "Implement the Tool interface and execute your application "
          + "with ToolRunner to remedy this.");
    }

    //
    // Figure out what fs the JobTracker is using. Copy the
    // job to it, under a temporary name. This allows DFS to work,
    // and under the local fs also provides UNIX-like object loading
    // semantics. (that is, if the job file is deleted right after
    // submission, we can still run the submission to completion)
    //

    // Create a number of filenames in the JobTracker's fs namespace
    LOG.debug("default FileSystem: " + jtFs.getUri());
    if (jtFs.exists(submitJobDir)) {
      throw new IOException("Not submitting job. Job directory " + submitJobDir
          + " already exists!! This is unexpected.Please check what's there in"
          + " that directory");
    }
    // Create the submission directory for the MapReduce job.
    submitJobDir = jtFs.makeQualified(submitJobDir);
    submitJobDir = new Path(submitJobDir.toUri().getPath());
    FsPermission mapredSysPerms =
        new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
    mkdirs(jtFs, submitJobDir, mapredSysPerms);

    if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
        MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) {
      disableErasureCodingForPath(submitJobDir);
    }

    // Get the resources that have been added via command line arguments in the
    // GenericOptionsParser (i.e. files, libjars, archives).
    Collection<String> files = conf.getStringCollection("tmpfiles");
    Collection<String> libjars = conf.getStringCollection("tmpjars");
    Collection<String> archives = conf.getStringCollection("tmparchives");
    String jobJar = job.getJar();

    // Merge resources that have been programmatically specified for the shared
    // cache via the Job API.
    files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE));
    libjars.addAll(conf.getStringCollection(
            MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE));
    archives.addAll(conf
        .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE));


    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
    checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);

    Map<String, Boolean> fileSCUploadPolicies =
        new LinkedHashMap<String, Boolean>();
    Map<String, Boolean> archiveSCUploadPolicies =
        new LinkedHashMap<String, Boolean>();

    uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
        fileSCUploadPolicies, statCache);
    uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
        fileSCUploadPolicies, statCache);
    uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
        archiveSCUploadPolicies, statCache);
    uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
    addLog4jToDistributedCache(job, submitJobDir);

    // Note, we do not consider resources in the distributed cache for the
    // shared cache at this time. Only resources specified via the
    // GenericOptionsParser or the jobjar.
    Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies);
    Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies);

    // set the timestamps of the archives and files
    // set the public/private visibility of the archives and files
    ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
        statCache);
    // get DelegationToken for cached file
    ClientDistributedCacheManager.getDelegationTokens(conf,
        job.getCredentials());
  }

  @VisibleForTesting
  void uploadFiles(Job job, Collection<String> files,
      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
      throws IOException {
    Configuration conf = job.getConfiguration();
    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
    if (!files.isEmpty()) {
      mkdirs(jtFs, filesDir, mapredSysPerms);
      for (String tmpFile : files) {
        URI tmpURI = null;
        try {
          tmpURI = new URI(tmpFile);
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException("Error parsing files argument."
              + " Argument must be a valid URI: " + tmpFile, e);
        }
        Path tmp = new Path(tmpURI);
        URI newURI = null;
        boolean uploadToSharedCache = false;
        if (scConfig.isSharedCacheFilesEnabled()) {
          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
          if (newURI == null) {
            uploadToSharedCache = true;
          }
        }

        if (newURI == null) {
          Path newPath =
              copyRemoteFiles(filesDir, tmp, conf, submitReplication);
          try {
            newURI = getPathURI(newPath, tmpURI.getFragment());
          } catch (URISyntaxException ue) {
            // should not throw a uri exception
            throw new IOException(
                "Failed to create a URI (URISyntaxException) for the"
                    + " remote path " + newPath
                    + ". This was based on the files parameter: " + tmpFile,
                ue);
          }
        }

        job.addCacheFile(newURI);
        if (scConfig.isSharedCacheFilesEnabled()) {
          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
        }
      }
    }
  }

  // Suppress warning for use of DistributedCache (it is everywhere).
  @SuppressWarnings("deprecation")
  @VisibleForTesting
  void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
      FsPermission mapredSysPerms, short submitReplication,
      Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
      throws IOException {
    Configuration conf = job.getConfiguration();
    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
    if (!libjars.isEmpty()) {
      mkdirs(jtFs, libjarsDir, mapredSysPerms);
      Collection<URI> libjarURIs = new LinkedList<>();
      boolean foundFragment = false;
      for (String tmpjars : libjars) {
        URI tmpURI = null;
        try {
          tmpURI = new URI(tmpjars);
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException("Error parsing libjars argument."
              + " Argument must be a valid URI: " + tmpjars, e);
        }
        Path tmp = new Path(tmpURI);
        URI newURI = null;
        boolean uploadToSharedCache = false;
        boolean fromSharedCache = false;
        if (scConfig.isSharedCacheLibjarsEnabled()) {
          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
          if (newURI == null) {
            uploadToSharedCache = true;
          } else {
            fromSharedCache = true;
          }
        }

        if (newURI == null) {
          Path newPath =
              copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
          try {
            newURI = getPathURI(newPath, tmpURI.getFragment());
          } catch (URISyntaxException ue) {
            // should not throw a uri exception
            throw new IOException(
                "Failed to create a URI (URISyntaxException) for the"
                    + " remote path " + newPath
                    + ". This was based on the libjar parameter: " + tmpjars,
                ue);
          }
        }

        if (!foundFragment) {
          // We do not count shared cache paths containing fragments as a
          // "foundFragment." This is because these resources are not in the
          // staging directory and will be added to the distributed cache
          // separately.
          foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
        }
        Job.addFileToClassPath(new Path(newURI.getPath()), conf, jtFs, false);
        if (fromSharedCache) {
          // We simply add this URI to the distributed cache. It will not come
          // from the staging directory (it is in the shared cache), so we
          // must add it to the cache regardless of the wildcard feature.
          Job.addCacheFile(newURI, conf);
        } else {
          libjarURIs.add(newURI);
        }

        if (scConfig.isSharedCacheLibjarsEnabled()) {
          fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
        }
      }

      if (useWildcard && !foundFragment) {
        // Add the whole directory to the cache using a wild card
        Path libJarsDirWildcard =
            jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
        Job.addCacheFile(libJarsDirWildcard.toUri(), conf);
      } else {
        for (URI uri : libjarURIs) {
          Job.addCacheFile(uri, conf);
        }
      }
    }
  }

  @VisibleForTesting
  void uploadArchives(Job job, Collection<String> archives,
      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
      Map<String, Boolean> archiveSCUploadPolicies,
      Map<URI, FileStatus> statCache) throws IOException {
    Configuration conf = job.getConfiguration();
    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
    if (!archives.isEmpty()) {
      mkdirs(jtFs, archivesDir, mapredSysPerms);
      for (String tmpArchives : archives) {
        URI tmpURI;
        try {
          tmpURI = new URI(tmpArchives);
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException("Error parsing archives argument."
              + " Argument must be a valid URI: " + tmpArchives, e);
        }
        Path tmp = new Path(tmpURI);
        URI newURI = null;
        boolean uploadToSharedCache = false;
        if (scConfig.isSharedCacheArchivesEnabled()) {
          newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
          if (newURI == null) {
            uploadToSharedCache = true;
          }
        }

        if (newURI == null) {
          Path newPath =
              copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
          try {
            newURI = getPathURI(newPath, tmpURI.getFragment());
          } catch (URISyntaxException ue) {
            // should not throw a uri exception
            throw new IOException(
                "Failed to create a URI (URISyntaxException) for the"
                    + " remote path " + newPath
                    + ". This was based on the archive parameter: "
                    + tmpArchives,
                ue);
          }
        }

        job.addCacheArchive(newURI);
        if (scConfig.isSharedCacheArchivesEnabled()) {
          archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
        }
      }
    }
  }

  @VisibleForTesting
  void uploadJobJar(Job job, String jobJar, Path submitJobDir,
      short submitReplication, Map<URI, FileStatus> statCache)
      throws IOException {
    Configuration conf = job.getConfiguration();
    if (jobJar != null) { // copy jar to JobTracker's fs
      // use jar name if job is not named.
      if ("".equals(job.getJobName())) {
        job.setJobName(new Path(jobJar).getName());
      }
      Path jobJarPath = new Path(jobJar);
      URI jobJarURI = jobJarPath.toUri();
      Path newJarPath = null;
      boolean uploadToSharedCache = false;
      if (jobJarURI.getScheme() == null ||
          jobJarURI.getScheme().equals("file")) {
        // job jar is on the local file system
        if (scConfig.isSharedCacheJobjarEnabled()) {
          // We must have a qualified path for the shared cache client. We can
          // assume this is for the local filesystem
          jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath);
          // Don't add a resource name here because the resource name (i.e.
          // job.jar directory symlink) will always be hard coded to job.jar for
          // the job.jar
          URI newURI =
              useSharedCache(jobJarPath.toUri(), null, statCache, conf, false);
          if (newURI == null) {
            uploadToSharedCache = true;
          } else {
            newJarPath = stringToPath(newURI.toString());
            // The job jar is coming from the shared cache (i.e. a public
            // place), so we want the job.jar to have a public visibility.
            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
          }
        }
        if (newJarPath == null) {
          newJarPath = JobSubmissionFiles.getJobJar(submitJobDir);
          copyJar(jobJarPath, newJarPath, submitReplication);
        }
      } else {
        // job jar is in a remote file system
        if (scConfig.isSharedCacheJobjarEnabled()) {
          // Don't add a resource name here because the resource name (i.e.
          // job.jar directory symlink) will always be hard coded to job.jar for
          // the job.jar
          URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false);
          if (newURI == null) {
            uploadToSharedCache = true;
            newJarPath = jobJarPath;
          } else {
            newJarPath = stringToPath(newURI.toString());
            // The job jar is coming from the shared cache (i.e. a public
            // place), so we want the job.jar to have a public visibility.
            conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
          }
        } else {
          // we don't need to upload the jobjar to the staging directory because
          // it is already in an accessible place
          newJarPath = jobJarPath;
        }
      }
      job.setJar(newJarPath.toString());
      if (scConfig.isSharedCacheJobjarEnabled()) {
        conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
            uploadToSharedCache);
      }
    } else {
      LOG.warn("No job jar file set.  User classes may not be found. "
          + "See Job or Job#setJar(String).");
    }
  }

  /**
   * Verify that the resources this job is going to localize are within the
   * localization limits. We count all resources towards these limits regardless
   * of where they are coming from (i.e. local, distributed cache, or shared
   * cache).
   */
  @VisibleForTesting
  void checkLocalizationLimits(Configuration conf, Collection<String> files,
      Collection<String> libjars, Collection<String> archives, String jobJar,
      Map<URI, FileStatus> statCache) throws IOException {

    LimitChecker limitChecker = new LimitChecker(conf);
    if (!limitChecker.hasLimits()) {
      // there are no limits set, so we are done.
      return;
    }

    // Get the files and archives that are already in the distributed cache
    Collection<String> dcFiles =
        conf.getStringCollection(MRJobConfig.CACHE_FILES);
    Collection<String> dcArchives =
        conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES);

    for (String uri : dcFiles) {
      explorePath(conf, stringToPath(uri), limitChecker, statCache);
    }

    for (String uri : dcArchives) {
      explorePath(conf, stringToPath(uri), limitChecker, statCache);
    }

    for (String uri : files) {
      explorePath(conf, stringToPath(uri), limitChecker, statCache);
    }

    for (String uri : libjars) {
      explorePath(conf, stringToPath(uri), limitChecker, statCache);
    }

    for (String uri : archives) {
      explorePath(conf, stringToPath(uri), limitChecker, statCache);
    }

    if (jobJar != null) {
      explorePath(conf, stringToPath(jobJar), limitChecker, statCache);
    }
  }

  /**
   * Convert a String to a Path and gracefully remove fragments/queries if they
   * exist in the String.
   */
  @VisibleForTesting
  Path stringToPath(String s) {
    try {
      URI uri = new URI(s);
      return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
    } catch (URISyntaxException e) {
      throw new IllegalArgumentException(
          "Error parsing argument." + " Argument must be a valid URI: " + s, e);
    }
  }

  @VisibleForTesting
  protected static final String MAX_RESOURCE_ERR_MSG =
      "This job has exceeded the maximum number of submitted resources";
  @VisibleForTesting
  protected static final String MAX_TOTAL_RESOURCE_MB_ERR_MSG =
      "This job has exceeded the maximum size of submitted resources";
  @VisibleForTesting
  protected static final String MAX_SINGLE_RESOURCE_MB_ERR_MSG =
      "This job has exceeded the maximum size of a single submitted resource";

  private static class LimitChecker {
    LimitChecker(Configuration conf) {
      this.maxNumOfResources =
          conf.getInt(MRJobConfig.MAX_RESOURCES,
              MRJobConfig.MAX_RESOURCES_DEFAULT);
      this.maxSizeMB =
          conf.getLong(MRJobConfig.MAX_RESOURCES_MB,
              MRJobConfig.MAX_RESOURCES_MB_DEFAULT);
      this.maxSizeOfResourceMB =
          conf.getLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
              MRJobConfig.MAX_SINGLE_RESOURCE_MB_DEFAULT);
      this.totalConfigSizeBytes = maxSizeMB * 1024 * 1024;
      this.totalConfigSizeOfResourceBytes = maxSizeOfResourceMB * 1024 * 1024;
    }

    private long totalSizeBytes = 0;
    private int totalNumberOfResources = 0;
    private long currentMaxSizeOfFileBytes = 0;
    private final long maxSizeMB;
    private final int maxNumOfResources;
    private final long maxSizeOfResourceMB;
    private final long totalConfigSizeBytes;
    private final long totalConfigSizeOfResourceBytes;

    private boolean hasLimits() {
      return maxNumOfResources > 0 || maxSizeMB > 0 || maxSizeOfResourceMB > 0;
    }

    private void addFile(Path p, long fileSizeBytes) throws IOException {
      totalNumberOfResources++;
      totalSizeBytes += fileSizeBytes;
      if (fileSizeBytes > currentMaxSizeOfFileBytes) {
        currentMaxSizeOfFileBytes = fileSizeBytes;
      }

      if (totalConfigSizeBytes > 0 && totalSizeBytes > totalConfigSizeBytes) {
        throw new IOException(MAX_TOTAL_RESOURCE_MB_ERR_MSG + " (Max: "
            + maxSizeMB + "MB).");
      }

      if (maxNumOfResources > 0 &&
          totalNumberOfResources > maxNumOfResources) {
        throw new IOException(MAX_RESOURCE_ERR_MSG + " (Max: "
            + maxNumOfResources + ").");
      }

      if (totalConfigSizeOfResourceBytes > 0
          && currentMaxSizeOfFileBytes > totalConfigSizeOfResourceBytes) {
        throw new IOException(MAX_SINGLE_RESOURCE_MB_ERR_MSG + " (Max: "
            + maxSizeOfResourceMB + "MB, Violating resource: " + p + ").");
      }
    }
  }

  /**
   * Recursively explore the given path and enforce the limits for resource
   * localization. This method assumes that there are no symlinks in the
   * directory structure.
   */
  private void explorePath(Configuration job, Path p,
      LimitChecker limitChecker, Map<URI, FileStatus> statCache)
      throws IOException {
    Path pathWithScheme = p;
    if (!pathWithScheme.toUri().isAbsolute()) {
      // the path does not have a scheme, so we assume it is a path from the
      // local filesystem
      FileSystem localFs = FileSystem.getLocal(job);
      pathWithScheme = localFs.makeQualified(p);
    }
    FileStatus status = getFileStatus(statCache, job, pathWithScheme);
    if (status.isDirectory()) {
      FileStatus[] statusArray =
          pathWithScheme.getFileSystem(job).listStatus(pathWithScheme);
      for (FileStatus s : statusArray) {
        explorePath(job, s.getPath(), limitChecker, statCache);
      }
    } else {
      limitChecker.addFile(pathWithScheme, status.getLen());
    }
  }

  @VisibleForTesting
  FileStatus getFileStatus(Map<URI, FileStatus> statCache,
      Configuration job, Path p) throws IOException {
    URI u = p.toUri();
    FileStatus status = statCache.get(u);
    if (status == null) {
      status = p.getFileSystem(job).getFileStatus(p);
      statCache.put(u, status);
    }
    return status;
  }

  /**
   * Create a new directory in the passed filesystem. This wrapper method exists
   * so that it can be overridden/stubbed during testing.
   */
  @VisibleForTesting
  boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
      throws IOException {
    return FileSystem.mkdirs(fs, dir, permission);
  }

  // copies a file to the jobtracker filesystem and returns the path where it
  // was copied to
  @VisibleForTesting
  Path copyRemoteFiles(Path parentDir, Path originalPath,
      Configuration conf, short replication) throws IOException {
    // check if we do not need to copy the files
    // is jt using the same file system.
    // just checking for uri strings... doing no dns lookups
    // to see if the filesystems are the same. This is not optimal.
    // but avoids name resolution.

    FileSystem remoteFs = null;
    remoteFs = originalPath.getFileSystem(conf);
    if (FileUtil.compareFs(remoteFs, jtFs)) {
      return originalPath;
    }

    boolean root = false;
    if (ROOT_PATH.equals(originalPath.toUri().getPath())) {
      // "/" needs special treatment
      root = true;
    } else {
      // If originalPath ends in a "/", then remove it so
      // that originalPath.getName() does not return an empty string
      String uriString = originalPath.toUri().toString();
      if (uriString.endsWith("/")) {
        try {
          URI strippedURI =
              new URI(uriString.substring(0, uriString.length() - 1));
          originalPath = new Path(strippedURI);
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException("Error processing URI", e);
        }
      }
    }

    // this might have name collisions. copy will throw an exception
    // parse the original path to create new path
    Path newPath = root ?
        parentDir : new Path(parentDir, originalPath.getName());
    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
    jtFs.setReplication(newPath, replication);
    jtFs.makeQualified(newPath);
    return newPath;
  }

  /**
   * Checksum a local resource file and call use for that resource with the scm.
   */
  private URI useSharedCache(URI sourceFile, String resourceName,
      Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment)
      throws IOException {
    if (scClient == null) {
      return null;
    }
    Path filePath = new Path(sourceFile);
    if (getFileStatus(statCache, conf, filePath).isDirectory()) {
      LOG.warn("Shared cache does not support directories"
          + " (see YARN-6097)." + " Will not upload " + filePath
          + " to the shared cache.");
      return null;
    }

    String rn = resourceName;
    if (honorFragment) {
      if (sourceFile.getFragment() != null) {
        rn = sourceFile.getFragment();
      }
    }

    // If for whatever reason, we can't even calculate checksum for
    // a resource, something is really wrong with the file system;
    // even non-SCM approach won't work. Let us just throw the exception.
    String checksum = scClient.getFileChecksum(filePath);
    URL url = null;
    try {
      url = scClient.use(this.appId, checksum);
    } catch (YarnException e) {
      LOG.warn("Error trying to contact the shared cache manager,"
          + " disabling the SCMClient for the rest of this job submission", e);
      /*
       * If we fail to contact the SCM, we do not use it for the rest of this
       * JobResourceUploader's life. This prevents us from having to timeout
       * each time we try to upload a file while the SCM is unavailable. Instead
       * we timeout/error the first time and quickly revert to the default
       * behavior without the shared cache. We do this by stopping the shared
       * cache client and setting it to null.
       */
      stopSharedCache();
    }

    if (url != null) {
      // Because we deal with URI's in mapreduce, we need to convert the URL to
      // a URI and add a fragment if necessary.
      URI uri = null;
      try {
        String name = new Path(url.getFile()).getName();
        if (rn != null && !name.equals(rn)) {
          // A name was specified that is different then the URL in the shared
          // cache. Therefore, we need to set the fragment portion of the URI to
          // preserve the user's desired name. We assume that there is no
          // existing fragment in the URL since the shared cache manager does
          // not use fragments.
          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
              url.getPort(), url.getFile(), null, rn);
        } else {
          uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
              url.getPort(), url.getFile(), null, null);
        }
        return uri;
      } catch (URISyntaxException e) {
        LOG.warn("Error trying to convert URL received from shared cache to"
            + " a URI: " + url.toString());
        return null;
      }
    } else {
      return null;
    }
  }

  @VisibleForTesting
  void copyJar(Path originalJarPath, Path submitJarFile,
      short replication) throws IOException {
    jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
    // The operation of setReplication requires certain permissions
    // so we need to make sure it has enough permissions
    jtFs.setPermission(submitJarFile, new FsPermission(
        JobSubmissionFiles.JOB_FILE_PERMISSION));
    jtFs.setReplication(submitJarFile, replication);
  }

  private void addLog4jToDistributedCache(Job job, Path jobSubmitDir)
      throws IOException {
    Configuration conf = job.getConfiguration();
    String log4jPropertyFile =
        conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
    if (!log4jPropertyFile.isEmpty()) {
      short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, 10);
      copyLog4jPropertyFile(job, jobSubmitDir, replication);
    }
  }

  private URI getPathURI(Path destPath, String fragment)
      throws URISyntaxException {
    URI pathURI = destPath.toUri();
    if (pathURI.getFragment() == null) {
      if (fragment == null) {
        // no fragment, just return existing pathURI from destPath
      } else {
        pathURI = new URI(pathURI.toString() + "#" + fragment);
      }
    }
    return pathURI;
  }

  // copy user specified log4j.property file in local
  // to HDFS with putting on distributed cache and adding its parent directory
  // to classpath.
  @SuppressWarnings("deprecation")
  private void copyLog4jPropertyFile(Job job, Path submitJobDir,
      short replication) throws IOException {
    Configuration conf = job.getConfiguration();

    String file =
        validateFilePath(
            conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
    LOG.debug("default FileSystem: " + jtFs.getUri());
    FsPermission mapredSysPerms =
        new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
    try {
      jtFs.getFileStatus(submitJobDir);
    } catch (FileNotFoundException e) {
      throw new IOException("Cannot find job submission directory! "
          + "It should just be created, so something wrong here.", e);
    }

    Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);

    // first copy local log4j.properties file to HDFS under submitJobDir
    if (file != null) {
      FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
      URI tmpURI = null;
      try {
        tmpURI = new URI(file);
      } catch (URISyntaxException e) {
        throw new IllegalArgumentException(e);
      }
      Path tmp = new Path(tmpURI);
      Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
      Path path = new Path(newPath.toUri().getPath());
      Job.addFileToClassPath(path, conf, path.getFileSystem(conf));
    }
  }

  /**
   * takes input as a path string for file and verifies if it exist. It defaults
   * for file:/// if the files specified do not have a scheme. it returns the
   * paths uri converted defaulting to file:///. So an input of /home/user/file1
   * would return file:///home/user/file1
   * 
   * @param file
   * @param conf
   * @return
   */
  private String validateFilePath(String file, Configuration conf)
      throws IOException {
    if (file == null) {
      return null;
    }
    if (file.isEmpty()) {
      throw new IllegalArgumentException("File name can't be empty string");
    }
    String finalPath;
    URI pathURI;
    try {
      pathURI = new URI(file);
    } catch (URISyntaxException e) {
      throw new IllegalArgumentException(e);
    }
    Path path = new Path(pathURI);
    if (pathURI.getScheme() == null) {
      FileSystem localFs = FileSystem.getLocal(conf);
      // default to the local file system
      // check if the file exists or not first
      localFs.getFileStatus(path);
      finalPath =
          path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
              .toString();
    } else {
      // check if the file exists in this file system
      // we need to recreate this filesystem object to copy
      // these files to the file system ResourceManager is running
      // on.
      FileSystem fs = path.getFileSystem(conf);
      fs.getFileStatus(path);
      finalPath =
          path.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
    }
    return finalPath;
  }

  private void disableErasureCodingForPath(Path path)
      throws IOException {
    try {
      if (jtFs instanceof DistributedFileSystem) {
        LOG.info("Disabling Erasure Coding for path: " + path);
        DistributedFileSystem dfs = (DistributedFileSystem) jtFs;
        dfs.setErasureCodingPolicy(path,
            SystemErasureCodingPolicies.getReplicationPolicy().getName());
      }
    } catch (RemoteException e) {
      if (!RpcNoSuchMethodException.class.getName().equals(e.getClassName())) {
        throw e;
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "Ignore disabling erasure coding for path {} because method "
                  + "disableErasureCodingForPath doesn't exist, probably "
                  + "talking to a lower version HDFS.", path.toString(), e);
        }
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Cluster 源码

hadoop ClusterMetrics 源码

hadoop ContextFactory 源码

hadoop Counter 源码

hadoop CounterGroup 源码

hadoop Counters 源码

hadoop CryptoUtils 源码

hadoop CustomJobEndNotifier 源码

hadoop FileSystemCounter 源码

hadoop ID 源码

0  赞