hadoop MagicCommitIntegration 源码

  • 2022-10-20
  • 浏览 (150)

haddop MagicCommitIntegration 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;

import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;

/**
 * Adds the code needed for S3A to support magic committers.
 * It's pulled out to keep S3A FS class slightly less complex.
 * This class can be instantiated even when magic commit is disabled;
 * in this case:
 * <ol>
 *   <li>{@link #isMagicCommitPath(Path)} will always return false.</li>
 *   <li>{@link #isUnderMagicPath(Path)} will always return false.</li>
 *   <li>{@link #createTracker(Path, String, PutTrackerStatistics)} will always
 *   return an instance of {@link PutTracker}.</li>
 * </ol>
 *
 * <p>Important</p>: must not directly or indirectly import a class which
 * uses any datatype in hadoop-mapreduce.
 */
public class MagicCommitIntegration extends AbstractStoreOperation {
  private static final Logger LOG =
      LoggerFactory.getLogger(MagicCommitIntegration.class);
  private final S3AFileSystem owner;
  private final boolean magicCommitEnabled;

  /**
   * Instantiate.
   * @param owner owner class
   * @param magicCommitEnabled is magic commit enabled.
   */
  public MagicCommitIntegration(S3AFileSystem owner,
      boolean magicCommitEnabled) {
    super(owner.createStoreContext());
    this.owner = owner;
    this.magicCommitEnabled = magicCommitEnabled;
  }

  /**
   * Given an (elements, key) pair, return the key of the final destination of
   * the PUT, that is: where the final path is expected to go?
   * @param elements path split to elements
   * @param key key
   * @return key for final put. If this is not a magic commit, the
   * same as the key in.
   */
  public String keyOfFinalDestination(List<String> elements, String key) {
    if (isMagicCommitPath(elements)) {
      return elementsToKey(finalDestination(elements));
    } else {
      return key;
    }
  }

  /**
   * Given a path and a key to that same path, create a tracker for it.
   * This specific tracker will be chosen based on whether or not
   * the path is a magic one.
   * Auditing: the span used to invoke
   * this method will be the one used to create the write operation helper
   * for the commit tracker.
   * @param path path of nominal write
   * @param key key of path of nominal write
   * @param trackerStatistics tracker statistics
   * @return the tracker for this operation.
   */
  public PutTracker createTracker(Path path, String key,
      PutTrackerStatistics trackerStatistics) {
    final List<String> elements = splitPathToElements(path);
    PutTracker tracker;

    if(isMagicFile(elements)) {
      // path is of a magic file
      if (isMagicCommitPath(elements)) {
        final String destKey = keyOfFinalDestination(elements, key);
        String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
        getStoreContext().incrementStatistic(
            Statistic.COMMITTER_MAGIC_FILES_CREATED);
        tracker = new MagicCommitTracker(path,
            getStoreContext().getBucket(),
            key,
            destKey,
            pendingsetPath,
            owner.getWriteOperationHelper(),
            trackerStatistics);
        LOG.debug("Created {}", tracker);
      } else {
        LOG.warn("File being created has a \"magic\" path, but the filesystem"
            + " has magic file support disabled: {}", path);
        // downgrade to standard multipart tracking
        tracker = new PutTracker(key);
      }
    } else {
      // standard multipart tracking
      tracker = new PutTracker(key);
    }
    return tracker;
  }

  /**
   * This performs the calculation of the final destination of a set
   * of elements.
   *
   * @param elements original (do not edit after this call)
   * @return a list of elements, possibly empty
   */
  private List<String> finalDestination(List<String> elements) {
    return magicCommitEnabled ?
        MagicCommitPaths.finalDestination(elements)
        : elements;
  }

  /**
   * Is magic commit enabled?
   * @return true if magic commit is turned on.
   */
  public boolean isMagicCommitEnabled() {
    return magicCommitEnabled;
  }

  /**
   * Predicate: is a path a magic commit path?
   * @param path path to examine
   * @return true if the path is or is under a magic directory
   */
  public boolean isMagicCommitPath(Path path) {
    return isMagicCommitPath(splitPathToElements(path));
  }

  /**
   * Is this path a magic commit path in this filesystem?
   * True if magic commit is enabled, the path is magic
   * and the path is not actually a commit metadata file.
   * @param elements element list
   * @return true if writing path is to be uprated to a magic file write
   */
  private boolean isMagicCommitPath(List<String> elements) {
    return magicCommitEnabled && isMagicFile(elements);
  }

  /**
   * Is the file a magic file: this predicate doesn't check
   * for the FS actually having the magic bit being set.
   * @param elements path elements
   * @return true if the path is one a magic file write expects.
   */
  private boolean isMagicFile(List<String> elements) {
    return isMagicPath(elements) &&
        !isCommitMetadataFile(elements);
  }

  /**
   * Does this file contain all the commit metadata?
   * @param elements path element list
   * @return true if this file is one of the commit metadata files.
   */
  private boolean isCommitMetadataFile(List<String> elements) {
    String last = elements.get(elements.size() - 1);
    return last.endsWith(CommitConstants.PENDING_SUFFIX)
        || last.endsWith(CommitConstants.PENDINGSET_SUFFIX);
  }

  /**
   * Is this path in/under a magic path...regardless of file type.
   * This is used to optimize create() operations.
   * @param path path to check
   * @return true if the path is one a magic file write expects.
   */
  public boolean isUnderMagicPath(Path path) {
    return magicCommitEnabled && isMagicPath(splitPathToElements(path));
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractS3ACommitter 源码

hadoop AbstractS3ACommitterFactory 源码

hadoop CommitConstants 源码

hadoop CommitUtils 源码

hadoop CommitterStatisticNames 源码

hadoop InternalCommitterConstants 源码

hadoop LocalTempDir 源码

hadoop MagicCommitPaths 源码

hadoop PathCommitException 源码

hadoop PutTracker 源码

0  赞