hadoop GenerateData 源码

  • 2022-10-20
  • 浏览 (101)

haddop GenerateData 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;

// TODO can replace with form of GridmixJob
class GenerateData extends GridmixJob {

   * Total bytes to write.
  public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";

   * Maximum size per file written.
  public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";

   * Size of writes to output file.
  public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";

   * Status reporting interval, in megabytes.
  public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";

   * Blocksize of generated data.
  public static final String GRIDMIX_GEN_BLOCKSIZE = "gridmix.gen.blocksize";

   * Replication of generated data.
  public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";

  public GenerateData(Configuration conf, Path outdir, long genbytes)
      throws IOException {
    super(conf, 0L, JOB_NAME);
    job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
    FileOutputFormat.setOutputPath(job, outdir);

   * Represents the input data characteristics.
  static class DataStatistics {
    private long dataSize;
    private long numFiles;
    private boolean isDataCompressed;
    DataStatistics(long dataSize, long numFiles, boolean isCompressed) {
      this.dataSize = dataSize;
      this.numFiles = numFiles;
      this.isDataCompressed = isCompressed;
    long getDataSize() {
      return dataSize;
    long getNumFiles() {
      return numFiles;
    boolean isDataCompressed() {
      return isDataCompressed;
   * Publish the data statistics.
  static DataStatistics publishDataStatistics(Path inputDir, long genBytes, 
                                              Configuration conf) 
  throws IOException {
    if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
      return CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
                                        conf, genBytes);
    } else {
      return publishPlainDataStatistics(conf, inputDir);
  static DataStatistics publishPlainDataStatistics(Configuration conf, 
                                                   Path inputDir) 
  throws IOException {
    FileSystem fs = inputDir.getFileSystem(conf);

    // obtain input data file statuses
    long dataSize = 0;
    long fileCount = 0;
    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
    PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
    while (iter.hasNext()) {
      LocatedFileStatus lStatus = iter.next();
      if (filter.accept(lStatus.getPath())) {
        dataSize += lStatus.getLen();

    // publish the plain data statistics
    LOG.info("Total size of input data : " 
             + StringUtils.humanReadableInt(dataSize));
    LOG.info("Total number of input data files : " + fileCount);
    return new DataStatistics(dataSize, fileCount, false);
  public Job call() throws IOException, InterruptedException,
                           ClassNotFoundException {
    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
    ugi.doAs( new PrivilegedExceptionAction <Job>() {
       public Job run() throws IOException, ClassNotFoundException,
                               InterruptedException {
         // check if compression emulation is enabled
         if (CompressionEmulationUtil
             .isCompressionEmulationEnabled(job.getConfiguration())) {
         } else {
         return job;
       private void configureRandomBytesDataGenerator() {
        try {
          FileInputFormat.addInputPath(job, new Path("ignored"));
        } catch (IOException e) {
          LOG.error("Error while adding input path ", e);
    return job;
  protected boolean canEmulateCompression() {
    return false;

  public static class GenDataMapper
      extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {

    private BytesWritable val;
    private final Random r = new Random();

    protected void setup(Context context)
        throws IOException, InterruptedException {
      val = new BytesWritable(new byte[
          context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);

    public void map(NullWritable key, LongWritable value, Context context)
        throws IOException, InterruptedException {
      for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
        val.setSize((int)Math.min(val.getLength(), bytes));
        context.write(key, val);


  static class GenDataFormat extends InputFormat<NullWritable,LongWritable> {

    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
      final JobClient client =
        new JobClient(new JobConf(jobCtxt.getConfiguration()));
      ClusterStatus stat = client.getClusterStatus(true);
      final long toGen =
        jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
      if (toGen < 0) {
        throw new IOException("Invalid/missing generation bytes: " + toGen);
      final int nTrackers = stat.getTaskTrackers();
      final long bytesPerTracker = toGen / nTrackers;
      final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
      final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
      final Matcher m = trackerPattern.matcher("");
      for (String tracker : stat.getActiveTrackerNames()) {
        if (!m.find()) {
          System.err.println("Skipping node: " + tracker);
        final String name = m.group(1);
        splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
      return splits;

    public RecordReader<NullWritable,LongWritable> createRecordReader(
        InputSplit split, final TaskAttemptContext taskContext)
        throws IOException {
      return new RecordReader<NullWritable,LongWritable>() {
        long written = 0L;
        long write = 0L;
        long RINTERVAL;
        long toWrite;
        final NullWritable key = NullWritable.get();
        final LongWritable val = new LongWritable();

        public void initialize(InputSplit split, TaskAttemptContext ctxt)
            throws IOException, InterruptedException {
          toWrite = split.getLength();
          RINTERVAL = ctxt.getConfiguration().getInt(
              GRIDMIX_GEN_INTERVAL, 10) << 20;
        public boolean nextKeyValue() throws IOException {
          written += write;
          write = Math.min(toWrite - written, RINTERVAL);
          return written < toWrite;
        public float getProgress() throws IOException {
          return written / ((float)toWrite);
        public NullWritable getCurrentKey() { return key; }
        public LongWritable getCurrentValue() { return val; }
        public void close() throws IOException {
          taskContext.setStatus("Wrote " + toWrite);

  static class GenSplit extends InputSplit implements Writable {
    private long bytes;
    private int nLoc;
    private String[] locations;

    public GenSplit() { }
    public GenSplit(long bytes, String[] locations) {
      this(bytes, locations.length, locations);
    public GenSplit(long bytes, int nLoc, String[] locations) {
      this.bytes = bytes;
      this.nLoc = nLoc;
      this.locations = Arrays.copyOf(locations, nLoc);
    public long getLength() {
      return bytes;
    public String[] getLocations() {
      return locations;
    public void readFields(DataInput in) throws IOException {
      bytes = in.readLong();
      nLoc = in.readInt();
      if (null == locations || locations.length < nLoc) {
        locations = new String[nLoc];
      for (int i = 0; i < nLoc; ++i) {
        locations[i] = Text.readString(in);
    public void write(DataOutput out) throws IOException {
      for (int i = 0; i < nLoc; ++i) {
        Text.writeString(out, locations[i]);

  static class RawBytesOutputFormat
      extends FileOutputFormat<NullWritable,BytesWritable> {

    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
        TaskAttemptContext job) throws IOException {

      return new ChunkWriter(getDefaultWorkFile(job, ""),

    static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
      private final Path outDir;
      private final FileSystem fs;
      private final int blocksize;
      private final short replicas;
      private final FsPermission genPerms = new FsPermission((short) 0777);
      private final long maxFileBytes;

      private long accFileBytes = 0L;
      private long fileIdx = -1L;
      private OutputStream fileOut = null;

      public ChunkWriter(Path outDir, Configuration conf) throws IOException {
        this.outDir = outDir;
        fs = outDir.getFileSystem(conf);
        blocksize = conf.getInt(GRIDMIX_GEN_BLOCKSIZE, 1 << 28);
        replicas = (short) conf.getInt(GRIDMIX_GEN_REPLICATION, 3);
        maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
      private void nextDestination() throws IOException {
        if (fileOut != null) {
        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)),
                            genPerms, false, 64 * 1024, replicas, 
                            blocksize, null);
        accFileBytes = 0L;
      public void write(NullWritable key, BytesWritable value)
          throws IOException {
        int written = 0;
        final int total = value.getLength();
        while (written < total) {
          if (accFileBytes >= maxFileBytes) {
          final int write = (int)
            Math.min(total - written, maxFileBytes - accFileBytes);
          fileOut.write(value.getBytes(), written, write);
          written += write;
          accFileBytes += write;
      public void close(TaskAttemptContext ctxt) throws IOException {



hadoop 源码目录


hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateDistCacheData 源码

hadoop Gridmix 源码

0  赞