hadoop GenerateDistCacheData 源码

  • 2022-10-20
  • 浏览 (197)

haddop GenerateDistCacheData 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UserGroupInformation;

/**
 * GridmixJob that generates distributed cache files.
 * {@link GenerateDistCacheData} expects a list of distributed cache files to be
 * generated as input. This list is expected to be stored as a sequence file
 * and the filename is expected to be configured using
 * {@code gridmix.distcache.file.list}.
 * This input file contains the list of distributed cache files and their sizes.
 * For each record (i.e. file size and file path) in this input file,
 * a file with the specific file size at the specific path is created.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
class GenerateDistCacheData extends GridmixJob {

  /**
   * Number of distributed cache files to be created by gridmix
   */
  static final String GRIDMIX_DISTCACHE_FILE_COUNT =
      "gridmix.distcache.file.count";
  /**
   * Total number of bytes to be written to the distributed cache files by
   * gridmix. i.e. Sum of sizes of all unique distributed cache files to be
   * created by gridmix.
   */
  static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
      "gridmix.distcache.byte.count";
  /**
   * The special file created(and used) by gridmix, that contains the list of
   * unique distributed cache files that are to be created and their sizes.
   */
  static final String GRIDMIX_DISTCACHE_FILE_LIST =
      "gridmix.distcache.file.list";
  static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";

  /**
   * Create distributed cache file with the permissions 0644.
   * Since the private distributed cache directory doesn't have execute
   * permission for others, it is OK to set read permission for others for
   * the files under that directory and still they will become 'private'
   * distributed cache files on the simulated cluster.
   */
  static final short GRIDMIX_DISTCACHE_FILE_PERM = 0644;

  private static final Charset charsetUTF8 = Charset.forName("UTF-8");

  public GenerateDistCacheData(Configuration conf) throws IOException {
    super(conf, 0L, JOB_NAME);
  }

  @Override
  public Job call() throws IOException, InterruptedException,
                           ClassNotFoundException {
    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
    ugi.doAs( new PrivilegedExceptionAction <Job>() {
       public Job run() throws IOException, ClassNotFoundException,
                               InterruptedException {
        job.setMapperClass(GenDCDataMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setInputFormatClass(GenDCDataFormat.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setJarByClass(GenerateDistCacheData.class);
        try {
          FileInputFormat.addInputPath(job, new Path("ignored"));
        } catch (IOException e) {
          LOG.error("Error while adding input path ", e);
        }
        job.submit();
        return job;
      }
    });
    return job;
  }

  @Override
  protected boolean canEmulateCompression() {
    return false;
  }

  public static class GenDCDataMapper
      extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {

    private BytesWritable val;
    private final Random r = new Random();
    private FileSystem fs;

    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      val = new BytesWritable(new byte[context.getConfiguration().getInt(
              GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
      fs = FileSystem.get(context.getConfiguration());
    }

    // Create one distributed cache file with the needed file size.
    // key is distributed cache file size and
    // value is distributed cache file path.
    @Override
    public void map(LongWritable key, BytesWritable value, Context context)
        throws IOException, InterruptedException {

      String fileName = new String(value.getBytes(), 0,
          value.getLength(), charsetUTF8);
      Path path = new Path(fileName);

      FSDataOutputStream dos =
          FileSystem.create(fs, path, new FsPermission(GRIDMIX_DISTCACHE_FILE_PERM));

      int size = 0;
      for (long bytes = key.get(); bytes > 0; bytes -= size) {
        r.nextBytes(val.getBytes());
        size = (int)Math.min(val.getLength(), bytes);
        dos.write(val.getBytes(), 0, size);// Write to distCache file
      }
      dos.close();
    }
  }

  /**
   * InputFormat for GenerateDistCacheData.
   * Input to GenerateDistCacheData is the special file(in SequenceFile format)
   * that contains the list of distributed cache files to be generated along
   * with their file sizes.
   */
  static class GenDCDataFormat
      extends InputFormat<LongWritable, BytesWritable> {

    // Split the special file that contains the list of distributed cache file
    // paths and their file sizes such that each split corresponds to
    // approximately same amount of distributed cache data to be generated.
    // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
    // for this job, if there is lot of data to be generated.
    @Override
    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
      final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
      final JobClient client = new JobClient(jobConf);
      ClusterStatus stat = client.getClusterStatus(true);
      int numTrackers = stat.getTaskTrackers();
      final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);

      // Total size of distributed cache files to be generated
      final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
      // Get the path of the special file
      String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
      if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
        throw new RuntimeException("Invalid metadata: #files (" + fileCount
            + "), total_size (" + totalSize + "), filelisturi ("
            + distCacheFileList + ")");
      }

      Path sequenceFile = new Path(distCacheFileList);
      FileSystem fs = sequenceFile.getFileSystem(jobConf);
      FileStatus srcst = fs.getFileStatus(sequenceFile);
      // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
      int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
      int numSplits = numTrackers * numMapSlotsPerTracker;

      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
      LongWritable key = new LongWritable();
      BytesWritable value = new BytesWritable();

      // Average size of data to be generated by each map task
      final long targetSize = Math.max(totalSize / numSplits,
                                DistributedCacheEmulator.AVG_BYTES_PER_MAP);
      long splitStartPosition = 0L;
      long splitEndPosition = 0L;
      long acc = 0L;
      long bytesRemaining = srcst.getLen();
      SequenceFile.Reader reader = null;
      try {
        reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
        while (reader.next(key, value)) {

          // If adding this file would put this split past the target size,
          // cut the last split and put this file in the next split.
          if (acc + key.get() > targetSize && acc != 0) {
            long splitSize = splitEndPosition - splitStartPosition;
            splits.add(new FileSplit(
                sequenceFile, splitStartPosition, splitSize, (String[])null));
            bytesRemaining -= splitSize;
            splitStartPosition = splitEndPosition;
            acc = 0L;
          }
          acc += key.get();
          splitEndPosition = reader.getPosition();
        }
      } finally {
        if (reader != null) {
          reader.close();
        }
      }
      if (bytesRemaining != 0) {
        splits.add(new FileSplit(
            sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
      }

      return splits;
    }

    /**
     * Returns a reader for this split of the distributed cache file list.
     */
    @Override
    public RecordReader<LongWritable, BytesWritable> createRecordReader(
        InputSplit split, final TaskAttemptContext taskContext)
        throws IOException, InterruptedException {
      return new SequenceFileRecordReader<LongWritable, BytesWritable>();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateData 源码

hadoop Gridmix 源码

0  赞