hadoop GenerateData 源码

  • 2022-10-20
  • 浏览 (101)

haddop GenerateData 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/GenerateData.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;

// TODO can replace with form of GridmixJob
class GenerateData extends GridmixJob {

  /**
   * Total bytes to write.
   */
  public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";

  /**
   * Maximum size per file written.
   */
  public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";

  /**
   * Size of writes to output file.
   */
  public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";

  /**
   * Status reporting interval, in megabytes.
   */
  public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";

  /**
   * Blocksize of generated data.
   */
  public static final String GRIDMIX_GEN_BLOCKSIZE = "gridmix.gen.blocksize";

  /**
   * Replication of generated data.
   */
  public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
  static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";

  public GenerateData(Configuration conf, Path outdir, long genbytes)
      throws IOException {
    super(conf, 0L, JOB_NAME);
    job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
    FileOutputFormat.setOutputPath(job, outdir);
  }

  /**
   * Represents the input data characteristics.
   */
  static class DataStatistics {
    private long dataSize;
    private long numFiles;
    private boolean isDataCompressed;
    
    DataStatistics(long dataSize, long numFiles, boolean isCompressed) {
      this.dataSize = dataSize;
      this.numFiles = numFiles;
      this.isDataCompressed = isCompressed;
    }
    
    long getDataSize() {
      return dataSize;
    }
    
    long getNumFiles() {
      return numFiles;
    }
    
    boolean isDataCompressed() {
      return isDataCompressed;
    }
  }
  
  /**
   * Publish the data statistics.
   */
  static DataStatistics publishDataStatistics(Path inputDir, long genBytes, 
                                              Configuration conf) 
  throws IOException {
    if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
      return CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
                                        conf, genBytes);
    } else {
      return publishPlainDataStatistics(conf, inputDir);
    }
  }
  
  static DataStatistics publishPlainDataStatistics(Configuration conf, 
                                                   Path inputDir) 
  throws IOException {
    FileSystem fs = inputDir.getFileSystem(conf);

    // obtain input data file statuses
    long dataSize = 0;
    long fileCount = 0;
    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
    PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
    while (iter.hasNext()) {
      LocatedFileStatus lStatus = iter.next();
      if (filter.accept(lStatus.getPath())) {
        dataSize += lStatus.getLen();
        ++fileCount;
      }
    }

    // publish the plain data statistics
    LOG.info("Total size of input data : " 
             + StringUtils.humanReadableInt(dataSize));
    LOG.info("Total number of input data files : " + fileCount);
    
    return new DataStatistics(dataSize, fileCount, false);
  }
  
  @Override
  public Job call() throws IOException, InterruptedException,
                           ClassNotFoundException {
    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
    ugi.doAs( new PrivilegedExceptionAction <Job>() {
       public Job run() throws IOException, ClassNotFoundException,
                               InterruptedException {
         // check if compression emulation is enabled
         if (CompressionEmulationUtil
             .isCompressionEmulationEnabled(job.getConfiguration())) {
           CompressionEmulationUtil.configure(job);
         } else {
           configureRandomBytesDataGenerator();
         }
         job.submit();
         return job;
       }
       
       private void configureRandomBytesDataGenerator() {
        job.setMapperClass(GenDataMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setInputFormatClass(GenDataFormat.class);
        job.setOutputFormatClass(RawBytesOutputFormat.class);
        job.setJarByClass(GenerateData.class);
        try {
          FileInputFormat.addInputPath(job, new Path("ignored"));
        } catch (IOException e) {
          LOG.error("Error while adding input path ", e);
        }
      }
    });
    return job;
  }
  
  @Override
  protected boolean canEmulateCompression() {
    return false;
  }

  public static class GenDataMapper
      extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {

    private BytesWritable val;
    private final Random r = new Random();

    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      val = new BytesWritable(new byte[
          context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);
    }

    @Override
    public void map(NullWritable key, LongWritable value, Context context)
        throws IOException, InterruptedException {
      for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
        r.nextBytes(val.getBytes());
        val.setSize((int)Math.min(val.getLength(), bytes));
        context.write(key, val);
      }
    }

  }

  static class GenDataFormat extends InputFormat<NullWritable,LongWritable> {

    @Override
    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
      final JobClient client =
        new JobClient(new JobConf(jobCtxt.getConfiguration()));
      ClusterStatus stat = client.getClusterStatus(true);
      final long toGen =
        jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
      if (toGen < 0) {
        throw new IOException("Invalid/missing generation bytes: " + toGen);
      }
      final int nTrackers = stat.getTaskTrackers();
      final long bytesPerTracker = toGen / nTrackers;
      final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
      final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
      final Matcher m = trackerPattern.matcher("");
      for (String tracker : stat.getActiveTrackerNames()) {
        m.reset(tracker);
        if (!m.find()) {
          System.err.println("Skipping node: " + tracker);
          continue;
        }
        final String name = m.group(1);
        splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
      }
      return splits;
    }

    @Override
    public RecordReader<NullWritable,LongWritable> createRecordReader(
        InputSplit split, final TaskAttemptContext taskContext)
        throws IOException {
      return new RecordReader<NullWritable,LongWritable>() {
        long written = 0L;
        long write = 0L;
        long RINTERVAL;
        long toWrite;
        final NullWritable key = NullWritable.get();
        final LongWritable val = new LongWritable();

        @Override
        public void initialize(InputSplit split, TaskAttemptContext ctxt)
            throws IOException, InterruptedException {
          toWrite = split.getLength();
          RINTERVAL = ctxt.getConfiguration().getInt(
              GRIDMIX_GEN_INTERVAL, 10) << 20;
        }
        @Override
        public boolean nextKeyValue() throws IOException {
          written += write;
          write = Math.min(toWrite - written, RINTERVAL);
          val.set(write);
          return written < toWrite;
        }
        @Override
        public float getProgress() throws IOException {
          return written / ((float)toWrite);
        }
        @Override
        public NullWritable getCurrentKey() { return key; }
        @Override
        public LongWritable getCurrentValue() { return val; }
        @Override
        public void close() throws IOException {
          taskContext.setStatus("Wrote " + toWrite);
        }
      };
    }
  }

  static class GenSplit extends InputSplit implements Writable {
    private long bytes;
    private int nLoc;
    private String[] locations;

    public GenSplit() { }
    public GenSplit(long bytes, String[] locations) {
      this(bytes, locations.length, locations);
    }
    public GenSplit(long bytes, int nLoc, String[] locations) {
      this.bytes = bytes;
      this.nLoc = nLoc;
      this.locations = Arrays.copyOf(locations, nLoc);
    }
    @Override
    public long getLength() {
      return bytes;
    }
    @Override
    public String[] getLocations() {
      return locations;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      bytes = in.readLong();
      nLoc = in.readInt();
      if (null == locations || locations.length < nLoc) {
        locations = new String[nLoc];
      }
      for (int i = 0; i < nLoc; ++i) {
        locations[i] = Text.readString(in);
      }
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeLong(bytes);
      out.writeInt(nLoc);
      for (int i = 0; i < nLoc; ++i) {
        Text.writeString(out, locations[i]);
      }
    }
  }

  static class RawBytesOutputFormat
      extends FileOutputFormat<NullWritable,BytesWritable> {

    @Override
    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
        TaskAttemptContext job) throws IOException {

      return new ChunkWriter(getDefaultWorkFile(job, ""),
          job.getConfiguration());
    }

    static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
      private final Path outDir;
      private final FileSystem fs;
      private final int blocksize;
      private final short replicas;
      private final FsPermission genPerms = new FsPermission((short) 0777);
      private final long maxFileBytes;

      private long accFileBytes = 0L;
      private long fileIdx = -1L;
      private OutputStream fileOut = null;

      public ChunkWriter(Path outDir, Configuration conf) throws IOException {
        this.outDir = outDir;
        fs = outDir.getFileSystem(conf);
        blocksize = conf.getInt(GRIDMIX_GEN_BLOCKSIZE, 1 << 28);
        replicas = (short) conf.getInt(GRIDMIX_GEN_REPLICATION, 3);
        maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
        nextDestination();
      }
      private void nextDestination() throws IOException {
        if (fileOut != null) {
          fileOut.close();
        }
        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)),
                            genPerms, false, 64 * 1024, replicas, 
                            blocksize, null);
        accFileBytes = 0L;
      }
      @Override
      public void write(NullWritable key, BytesWritable value)
          throws IOException {
        int written = 0;
        final int total = value.getLength();
        while (written < total) {
          if (accFileBytes >= maxFileBytes) {
            nextDestination();
          }
          final int write = (int)
            Math.min(total - written, maxFileBytes - accFileBytes);
          fileOut.write(value.getBytes(), written, write);
          written += write;
          accFileBytes += write;
        }
      }
      @Override
      public void close(TaskAttemptContext ctxt) throws IOException {
        fileOut.close();
      }
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateDistCacheData 源码

hadoop Gridmix 源码

0  赞