hadoop DistCpOptions 源码

  • 2022-10-20
  • 浏览 (25)

haddop DistCpOptions 代码

文件路径:/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.tools;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.util.DistCpUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;

/**
 * The Options class encapsulates all DistCp options.
 *
 * When you add a new option, please:
 *  - Add the field along with javadoc in DistCpOptions and its Builder
 *  - Add setter method in the {@link Builder} class
 *
 * This class is immutable.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class DistCpOptions {
  private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
  public static final int MAX_NUM_LISTSTATUS_THREADS = 40;

  /** File path (hdfs:// or file://) that contains the list of actual files to
   * copy.
   */
  private final Path sourceFileListing;

  /** List of source-paths (including wildcards) to be copied to target. */
  private final List<Path> sourcePaths;

  /** Destination path for the dist-copy. */
  private final Path targetPath;

  /** Whether data need to be committed automatically. */
  private final boolean atomicCommit;

  /** the work path for atomic commit. If null, the work
   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath). */
  private final Path atomicWorkPath;

  /** Whether source and target folder contents be sync'ed up. */
  private final boolean syncFolder;

  /** Path to save source/dest sequence files to, if non-null. */
  private final Path trackPath;

  /** Whether files only present in target should be deleted. */
  private boolean deleteMissing;

  /** Whether failures during copy be ignored. */
  private final boolean ignoreFailures;

  /** Whether files should always be overwritten on target. */
  private final boolean overwrite;

  /** Whether we want to append new data to target files. This is valid only
   * with update option and CRC is not skipped. */
  private final boolean append;

  /** Whether checksum comparison should be skipped while determining if source
   * and destination files are identical. */
  private final boolean skipCRC;

  /** Whether to run blocking or non-blocking. */
  private final boolean blocking;

  // When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
  // to s2) of source cluster to the target cluster to sync target cluster with
  // the source cluster. Referred to as "Fdiff" in the code.
  // It's required that s2 is newer than s1.
  private final boolean useDiff;

  // When "-rdiff s2 s1 src tgt" is passed, apply reversed snapshot diff (from
  // s2 to s1) of target cluster to the target cluster, so to make target
  // cluster go back to s1. Referred to as "Rdiff" in the code.
  // It's required that s2 is newer than s1, and src and tgt have exact same
  // content at their s1, if src is not the same as tgt.
  private final boolean useRdiff;

  /** Whether to log additional info (path, size) in the SKIP/COPY log. */
  private final boolean verboseLog;

  // For both -diff and -rdiff, given the example command line switches, two
  // steps are taken:
  //   1. Sync Step. This step does renaming/deletion ops in the snapshot diff,
  //      so to avoid copying files copied already but renamed later(HDFS-7535)
  //   2. Copy Step. This step copy the necessary files from src to tgt
  //      2.1 For -diff, it copies from snapshot s2 of src (HDFS-8828)
  //      2.2 For -rdiff, it copies from snapshot s1 of src, where the src
  //          could be the tgt itself (HDFS-9820).
  //

  private final String fromSnapshot;
  private final String toSnapshot;

  /** The path to a file containing a list of paths to filter out of copy. */
  private final String filtersFile;

  /** Path where output logs are stored. If not specified, it will use the
   * default value JobStagingDir/_logs and delete upon job completion. */
  private final Path logPath;

  /** Set the copy strategy to use. Should map to a strategy implementation
   * in distp-default.xml. */
  private final String copyStrategy;

  /** per map bandwidth in MB. */
  private final float mapBandwidth;

  /** The number of threads to use for listStatus. We allow max
   * {@link #MAX_NUM_LISTSTATUS_THREADS} threads. Setting numThreads to zero
   * signify we should use the value from conf properties. */
  private final int numListstatusThreads;

  /** The max number of maps to use for copy. */
  private final int maxMaps;

  /** File attributes that need to be preserved. */
  private final EnumSet<FileAttribute> preserveStatus;

  // Size of chunk in number of blocks when splitting large file into chunks
  // to copy in parallel. Default is 0 and file are not splitted.
  private final int blocksPerChunk;

  private final int copyBufferSize;

  /** Whether data should be written directly to the target paths. */
  private final boolean directWrite;

  private final boolean useIterator;

  private final boolean updateRoot;

  /**
   * File attributes for preserve.
   *
   * Each enum entry uses the first char as its symbol.
   */
  public enum FileAttribute {
    REPLICATION,    // R
    BLOCKSIZE,      // B
    USER,           // U
    GROUP,          // G
    PERMISSION,     // P
    CHECKSUMTYPE,   // C
    ACL,            // A
    XATTR,          // X
    TIMES,          // T
    ERASURECODINGPOLICY; // E

    public static FileAttribute getAttribute(char symbol) {
      for (FileAttribute attribute : values()) {
        if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
          return attribute;
        }
      }
      throw new NoSuchElementException("No attribute for " + symbol);
    }
  }

  private DistCpOptions(Builder builder) {
    this.sourceFileListing = builder.sourceFileListing;
    this.sourcePaths = builder.sourcePaths;
    this.targetPath = builder.targetPath;

    this.atomicCommit = builder.atomicCommit;
    this.atomicWorkPath = builder.atomicWorkPath;
    this.syncFolder = builder.syncFolder;
    this.deleteMissing = builder.deleteMissing;
    this.ignoreFailures = builder.ignoreFailures;
    this.overwrite = builder.overwrite;
    this.append = builder.append;
    this.skipCRC = builder.skipCRC;
    this.blocking = builder.blocking;

    this.useDiff = builder.useDiff;
    this.useRdiff = builder.useRdiff;
    this.fromSnapshot = builder.fromSnapshot;
    this.toSnapshot = builder.toSnapshot;

    this.filtersFile = builder.filtersFile;
    this.logPath = builder.logPath;
    this.copyStrategy = builder.copyStrategy;

    this.mapBandwidth = builder.mapBandwidth;
    this.numListstatusThreads = builder.numListstatusThreads;
    this.maxMaps = builder.maxMaps;

    this.preserveStatus = builder.preserveStatus;

    this.blocksPerChunk = builder.blocksPerChunk;

    this.copyBufferSize = builder.copyBufferSize;
    this.verboseLog = builder.verboseLog;
    this.trackPath = builder.trackPath;

    this.directWrite = builder.directWrite;

    this.useIterator = builder.useIterator;

    this.updateRoot = builder.updateRoot;
  }

  public Path getSourceFileListing() {
    return sourceFileListing;
  }

  public List<Path> getSourcePaths() {
    return sourcePaths == null ?
        null :
        Collections.unmodifiableList(getUniquePaths(sourcePaths));
  }

  private List<Path> getUniquePaths(List<Path> srcPaths) {
    Set<Path> uniquePaths = new LinkedHashSet<>();
    for (Path path : srcPaths) {
      if (!uniquePaths.add(path)) {
        LOG.info("Path: {} added multiple times, ignoring the redundant entry.", path);
      }
    }
    return new ArrayList<>(uniquePaths);
  }

  public Path getTargetPath() {
    return targetPath;
  }

  public boolean shouldAtomicCommit() {
    return atomicCommit;
  }

  public Path getAtomicWorkPath() {
    return atomicWorkPath;
  }

  public boolean shouldSyncFolder() {
    return syncFolder;
  }

  public boolean shouldDeleteMissing() {
    return deleteMissing;
  }

  public boolean shouldIgnoreFailures() {
    return ignoreFailures;
  }

  public boolean shouldOverwrite() {
    return overwrite;
  }

  public boolean shouldAppend() {
    return append;
  }

  public boolean shouldSkipCRC() {
    return skipCRC;
  }

  public boolean shouldBlock() {
    return blocking;
  }

  public boolean shouldUseDiff() {
    return this.useDiff;
  }

  public boolean shouldUseRdiff() {
    return this.useRdiff;
  }

  public boolean shouldUseSnapshotDiff() {
    return shouldUseDiff() || shouldUseRdiff();
  }

  public String getFromSnapshot() {
    return this.fromSnapshot;
  }

  public String getToSnapshot() {
    return this.toSnapshot;
  }

  public String getFiltersFile() {
    return filtersFile;
  }

  public Path getLogPath() {
    return logPath;
  }

  public String getCopyStrategy() {
    return copyStrategy;
  }

  public int getNumListstatusThreads() {
    return numListstatusThreads;
  }

  public int getMaxMaps() {
    return maxMaps;
  }

  public float getMapBandwidth() {
    return mapBandwidth;
  }

  public Set<FileAttribute> getPreserveAttributes() {
    return (preserveStatus == null)
        ? null
        : Collections.unmodifiableSet(preserveStatus);
  }

  /**
   * Checks if the input attribute should be preserved or not.
   *
   * @param attribute - Attribute to check
   * @return True if attribute should be preserved, false otherwise
   */
  public boolean shouldPreserve(FileAttribute attribute) {
    return preserveStatus.contains(attribute);
  }

  public int getBlocksPerChunk() {
    return blocksPerChunk;
  }

  public int getCopyBufferSize() {
    return copyBufferSize;
  }

  public boolean shouldVerboseLog() {
    return verboseLog;
  }

  public Path getTrackPath() {
    return trackPath;
  }

  public boolean shouldDirectWrite() {
    return directWrite;
  }

  public boolean shouldUseIterator() {
    return useIterator;
  }

  public boolean shouldUpdateRoot() {
    return updateRoot;
  }

  /**
   * Add options to configuration. These will be used in the Mapper/committer
   *
   * @param conf - Configuration object to which the options need to be added
   */
  public void appendToConf(Configuration conf) {
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
        String.valueOf(atomicCommit));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
        String.valueOf(ignoreFailures));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
        String.valueOf(syncFolder));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
        String.valueOf(deleteMissing));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
        String.valueOf(overwrite));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
        String.valueOf(append));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF,
        String.valueOf(useDiff));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
        String.valueOf(useRdiff));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
        String.valueOf(skipCRC));
    if (mapBandwidth > 0) {
      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
          String.valueOf(mapBandwidth));
    }
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
        DistCpUtils.packAttributes(preserveStatus));
    if (filtersFile != null) {
      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
          filtersFile);
    }
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
        String.valueOf(blocksPerChunk));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
        String.valueOf(copyBufferSize));
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG,
        String.valueOf(verboseLog));
    if (trackPath != null) {
      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
          String.valueOf(trackPath));
    }
    if (numListstatusThreads > 0) {
      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
          Integer.toString(numListstatusThreads));
    }
    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
            String.valueOf(directWrite));

    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR,
        String.valueOf(useIterator));

    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.UPDATE_ROOT,
        String.valueOf(updateRoot));
  }

  /**
   * Utility to easily string-ify Options, for logging.
   *
   * @return String representation of the Options.
   */
  @Override
  public String toString() {
    return "DistCpOptions{" +
        "atomicCommit=" + atomicCommit +
        ", syncFolder=" + syncFolder +
        ", deleteMissing=" + deleteMissing +
        ", ignoreFailures=" + ignoreFailures +
        ", overwrite=" + overwrite +
        ", append=" + append +
        ", useDiff=" + useDiff +
        ", useRdiff=" + useRdiff +
        ", fromSnapshot=" + fromSnapshot +
        ", toSnapshot=" + toSnapshot +
        ", skipCRC=" + skipCRC +
        ", blocking=" + blocking +
        ", numListstatusThreads=" + numListstatusThreads +
        ", maxMaps=" + maxMaps +
        ", mapBandwidth=" + mapBandwidth +
        ", copyStrategy='" + copyStrategy + '\'' +
        ", preserveStatus=" + preserveStatus +
        ", atomicWorkPath=" + atomicWorkPath +
        ", logPath=" + logPath +
        ", sourceFileListing=" + sourceFileListing +
        ", sourcePaths=" + sourcePaths +
        ", targetPath=" + targetPath +
        ", filtersFile='" + filtersFile + '\'' +
        ", blocksPerChunk=" + blocksPerChunk +
        ", copyBufferSize=" + copyBufferSize +
        ", verboseLog=" + verboseLog +
        ", directWrite=" + directWrite +
        ", useiterator=" + useIterator +
        ", updateRoot=" + updateRoot +
        '}';
  }

  /**
   * The builder of the {@link DistCpOptions}.
   *
   * This is designed to be the only public interface to create a
   * {@link DistCpOptions} object for users. It follows a simple Builder design
   * pattern.
   */
  public static class Builder {
    private Path sourceFileListing;
    private List<Path> sourcePaths;
    private Path targetPath;

    private boolean atomicCommit = false;
    private Path atomicWorkPath;
    private boolean syncFolder = false;
    private boolean deleteMissing = false;
    private boolean ignoreFailures = false;
    private boolean overwrite = false;
    private boolean append = false;
    private boolean skipCRC = false;
    private boolean blocking = true;
    private boolean verboseLog = false;

    private boolean useDiff = false;
    private boolean useRdiff = false;
    private String fromSnapshot;
    private String toSnapshot;

    private String filtersFile;

    private Path logPath;
    private Path trackPath;
    private String copyStrategy = DistCpConstants.UNIFORMSIZE;

    private int numListstatusThreads = 0;  // 0 indicates that flag is not set.
    private int maxMaps = DistCpConstants.DEFAULT_MAPS;
    private float mapBandwidth = 0; // 0 indicates we should use the default

    private EnumSet<FileAttribute> preserveStatus =
        EnumSet.noneOf(FileAttribute.class);

    private int blocksPerChunk = 0;

    private int copyBufferSize =
            DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;

    private boolean directWrite = false;

    private boolean useIterator = false;

    private boolean updateRoot = false;

    public Builder(List<Path> sourcePaths, Path targetPath) {
      Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
          "Source paths should not be null or empty!");
      Preconditions.checkArgument(targetPath != null,
          "Target path should not be null!");
      this.sourcePaths = sourcePaths;
      this.targetPath = targetPath;
    }

    public Builder(Path sourceFileListing, Path targetPath) {
      Preconditions.checkArgument(sourceFileListing != null,
          "Source file listing should not be null!");
      Preconditions.checkArgument(targetPath != null,
          "Target path should not be null!");

      this.sourceFileListing = sourceFileListing;
      this.targetPath = targetPath;
    }

    /**
     * This is the single entry point for constructing DistCpOptions objects.
     *
     * Before a new DistCpOptions object is returned, it will set the dependent
     * options, validate the option combinations. After constructing, the
     * DistCpOptions instance is immutable.
     */
    public DistCpOptions build() {
      setOptionsForSplitLargeFile();

      validate();

      return new DistCpOptions(this);
    }

    /**
     * Override options for split large files.
     */
    private void setOptionsForSplitLargeFile() {
      if (blocksPerChunk <= 0) {
        return;
      }

      LOG.info("Enabling preserving blocksize since "
          + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
      preserve(FileAttribute.BLOCKSIZE);

      LOG.info("Set " + DistCpOptionSwitch.APPEND.getSwitch()
          + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
          + " is passed.");
      this.append = false;
    }

    private void validate() {
      if ((useDiff || useRdiff) && deleteMissing) {
        // -delete and -diff/-rdiff are mutually exclusive.
        throw new IllegalArgumentException("-delete and -diff/-rdiff are "
            + "mutually exclusive. The -delete option will be ignored.");
      }

      if (!atomicCommit && atomicWorkPath != null) {
        throw new IllegalArgumentException(
            "-tmp work-path can only be specified along with -atomic");
      }

      if (syncFolder && atomicCommit) {
        throw new IllegalArgumentException("Atomic commit can't be used with "
            + "sync folder or overwrite options");
      }

      if (deleteMissing && !(overwrite || syncFolder)) {
        throw new IllegalArgumentException("Delete missing is applicable "
            + "only with update or overwrite options");
      }

      if (overwrite && syncFolder) {
        throw new IllegalArgumentException("Overwrite and update options are "
            + "mutually exclusive");
      }

      if (!syncFolder && append) {
        throw new IllegalArgumentException(
            "Append is valid only with update options");
      }
      if (skipCRC && append) {
        throw new IllegalArgumentException(
            "Append is disallowed when skipping CRC");
      }
      if (!syncFolder && (useDiff || useRdiff)) {
        throw new IllegalArgumentException(
            "-diff/-rdiff is valid only with -update option");
      }

      if (useDiff || useRdiff) {
        if (StringUtils.isBlank(fromSnapshot) ||
            StringUtils.isBlank(toSnapshot)) {
          throw new IllegalArgumentException(
              "Must provide both the starting and ending " +
                  "snapshot names for -diff/-rdiff");
        }
      }
      if (useDiff && useRdiff) {
        throw new IllegalArgumentException(
            "-diff and -rdiff are mutually exclusive");
      }

      if (verboseLog && logPath == null) {
        throw new IllegalArgumentException(
            "-v is valid only with -log option");
      }
    }

    @VisibleForTesting
    Builder withSourcePaths(List<Path> newSourcePaths) {
      this.sourcePaths = newSourcePaths;
      return this;
    }

    public Builder withAtomicCommit(boolean newAtomicCommit) {
      this.atomicCommit = newAtomicCommit;
      return this;
    }

    public Builder withAtomicWorkPath(Path newAtomicWorkPath) {
      this.atomicWorkPath = newAtomicWorkPath;
      return this;
    }

    public Builder withSyncFolder(boolean newSyncFolder) {
      this.syncFolder = newSyncFolder;
      return this;
    }

    public Builder withDeleteMissing(boolean newDeleteMissing) {
      this.deleteMissing = newDeleteMissing;
      return this;
    }

    public Builder withIgnoreFailures(boolean newIgnoreFailures) {
      this.ignoreFailures = newIgnoreFailures;
      return this;
    }

    public Builder withOverwrite(boolean newOverwrite) {
      this.overwrite = newOverwrite;
      return this;
    }

    public Builder withAppend(boolean newAppend) {
      this.append = newAppend;
      return this;
    }

    /**
     * whether builder with crc.
     * @param newSkipCRC whether to skip crc check
     * @return  Builder object whether to skip crc check
     * @deprecated Use {@link #withSkipCRC(boolean)} instead.
     */
    @Deprecated
    public Builder withCRC(boolean newSkipCRC) {
      this.skipCRC = newSkipCRC;
      return this;
    }

    /**
     * whether builder with crc.
     * @param newSkipCRC whether to skip crc check
     * @return  Builder object whether to skip crc check
     */
    public Builder withSkipCRC(boolean newSkipCRC) {
      this.skipCRC = newSkipCRC;
      return this;
    }

    public Builder withBlocking(boolean newBlocking) {
      this.blocking = newBlocking;
      return this;
    }

    public Builder withUseDiff(String newFromSnapshot,  String newToSnapshot) {
      this.useDiff = true;
      this.fromSnapshot = newFromSnapshot;
      this.toSnapshot = newToSnapshot;
      return this;
    }

    public Builder withUseRdiff(String newFromSnapshot, String newToSnapshot) {
      this.useRdiff = true;
      this.fromSnapshot = newFromSnapshot;
      this.toSnapshot = newToSnapshot;
      return this;
    }

    public Builder withFiltersFile(String newFiletersFile) {
      this.filtersFile = newFiletersFile;
      return this;
    }

    public Builder withLogPath(Path newLogPath) {
      this.logPath = newLogPath;
      return this;
    }

    public Builder withTrackMissing(Path path) {
      this.trackPath = path;
      return this;
    }

    public Builder withCopyStrategy(String newCopyStrategy) {
      this.copyStrategy = newCopyStrategy;
      return this;
    }

    public Builder withMapBandwidth(float newMapBandwidth) {
      Preconditions.checkArgument(newMapBandwidth > 0,
          "Bandwidth " + newMapBandwidth + " is invalid (should be > 0)");
      this.mapBandwidth = newMapBandwidth;
      return this;
    }

    public Builder withNumListstatusThreads(int newNumListstatusThreads) {
      if (newNumListstatusThreads > MAX_NUM_LISTSTATUS_THREADS) {
        this.numListstatusThreads = MAX_NUM_LISTSTATUS_THREADS;
      } else if (newNumListstatusThreads > 0) {
        this.numListstatusThreads = newNumListstatusThreads;
      } else {
        this.numListstatusThreads = 0;
      }
      return this;
    }

    public Builder maxMaps(int newMaxMaps) {
      this.maxMaps = Math.max(newMaxMaps, 1);
      return this;
    }

    public Builder preserve(String attributes) {
      if (attributes == null || attributes.isEmpty()) {
        preserveStatus = EnumSet.allOf(FileAttribute.class);
      } else {
        for (int index = 0; index < attributes.length(); index++) {
          preserveStatus.add(FileAttribute.
              getAttribute(attributes.charAt(index)));
        }
      }
      return this;
    }

    public Builder preserve(FileAttribute attribute) {
      preserveStatus.add(attribute);
      return this;
    }

    public Builder withBlocksPerChunk(int newBlocksPerChunk) {
      this.blocksPerChunk = newBlocksPerChunk;
      return this;
    }

    public Builder withCopyBufferSize(int newCopyBufferSize) {
      this.copyBufferSize =
          newCopyBufferSize > 0 ? newCopyBufferSize
              : DistCpConstants.COPY_BUFFER_SIZE_DEFAULT;
      return this;
    }

    public Builder withVerboseLog(boolean newVerboseLog) {
      this.verboseLog = newVerboseLog;
      return this;
    }

    public Builder withDirectWrite(boolean newDirectWrite) {
      this.directWrite = newDirectWrite;
      return this;
    }

    public Builder withUseIterator(boolean useItr) {
      this.useIterator = useItr;
      return this;
    }

    public Builder withUpdateRoot(boolean updateRootAttrs) {
      this.updateRoot = updateRootAttrs;
      return this;
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop CopyFilter 源码

hadoop CopyListing 源码

hadoop CopyListingFileStatus 源码

hadoop DiffInfo 源码

hadoop DistCp 源码

hadoop DistCpConstants 源码

hadoop DistCpContext 源码

hadoop DistCpOptionSwitch 源码

hadoop DistCpSync 源码

hadoop FileBasedCopyListing 源码

0  赞