hadoop GridmixRecord 源码

  • 2022-10-20
  • 浏览 (185)

haddop GridmixRecord 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

class GridmixRecord implements WritableComparable<GridmixRecord> {

  private static final int FIXED_BYTES = 1;
  private int size = -1;
  private long seed;
  private final DataInputBuffer dib =
    new DataInputBuffer();
  private final DataOutputBuffer dob =
    new DataOutputBuffer(Long.SIZE / Byte.SIZE);
  private byte[] literal = dob.getData();
  private boolean compressible = false;
  private float compressionRatio = 
    CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
  private RandomTextDataGenerator rtg = null;

  GridmixRecord() {
    this(1, 0L);
  }

  GridmixRecord(int size, long seed) {
    this.seed = seed;
    setSizeInternal(size);
  }

  public int getSize() {
    return size;
  }

  public void setSize(int size) {
    setSizeInternal(size);
  }

  void setCompressibility(boolean compressible, float ratio) {
    this.compressible = compressible;
    this.compressionRatio = ratio;
    // Initialize the RandomTextDataGenerator once for every GridMix record
    // Note that RandomTextDataGenerator is needed only when the GridMix record
    // is configured to generate compressible text data.
    if (compressible) {
      rtg = 
        CompressionEmulationUtil.getRandomTextDataGenerator(ratio, 
                                   RandomTextDataGenerator.DEFAULT_SEED);
    }
  }
  
  private void setSizeInternal(int size) {
    this.size = Math.max(1, size);
    try {
      seed = maskSeed(seed, this.size);
      dob.reset();
      dob.writeLong(seed);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  public final void setSeed(long seed) {
    this.seed = seed;
  }

  /** Marsaglia, 2003. */
  long nextRand(long x) {
    x ^= (x << 13);
    x ^= (x >>> 7);
    return (x ^= (x << 17));
  }

  /**
   * Generate random text data that can be compressed. If the record is marked
   * compressible (via {@link FileOutputFormat#COMPRESS}), only then the 
   * random data will be text data else 
   * {@link GridmixRecord#writeRandom(DataOutput, int)} will be invoked.
   */
  private void writeRandomText(DataOutput out, final int size) 
  throws IOException {
    long tmp = seed;
    out.writeLong(tmp);
    int i = size - (Long.SIZE / Byte.SIZE);
    //TODO Should we use long for size. What if the data is more than 4G?
    
    String randomWord = rtg.getRandomWord();
    byte[] bytes = randomWord.getBytes("UTF-8");
    long randomWordSize = bytes.length;
    while (i >= randomWordSize) {
      out.write(bytes);
      i -= randomWordSize;
      
      // get the next random word
      randomWord = rtg.getRandomWord();
      bytes = randomWord.getBytes("UTF-8");
      // determine the random word size
      randomWordSize = bytes.length;
    }
    
    // pad the remaining bytes
    if (i > 0) {
      out.write(bytes, 0, i);
    }
  }
  
  public void writeRandom(DataOutput out, final int size) throws IOException {
    long tmp = seed;
    out.writeLong(tmp);
    int i = size - (Long.SIZE / Byte.SIZE);
    while (i > Long.SIZE / Byte.SIZE - 1) {
      tmp = nextRand(tmp);
      out.writeLong(tmp);
      i -= Long.SIZE / Byte.SIZE;
    }
    for (tmp = nextRand(tmp); i > 0; --i) {
      out.writeByte((int)(tmp & 0xFF));
      tmp >>>= Byte.SIZE;
    }
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    size = WritableUtils.readVInt(in);
    int payload = size - WritableUtils.getVIntSize(size);
    if (payload > Long.SIZE / Byte.SIZE) {
      seed = in.readLong();
      payload -= Long.SIZE / Byte.SIZE;
    } else {
      Arrays.fill(literal, (byte)0);
      in.readFully(literal, 0, payload);
      dib.reset(literal, 0, literal.length);
      seed = dib.readLong();
      payload = 0;
    }
    final int vBytes = in.skipBytes(payload);
    if (vBytes != payload) {
      throw new EOFException("Expected " + payload + ", read " + vBytes);
    }
  }

  @Override
  public void write(DataOutput out) throws IOException {
    // data bytes including vint encoding
    WritableUtils.writeVInt(out, size);
    final int payload = size - WritableUtils.getVIntSize(size);
    if (payload > Long.SIZE / Byte.SIZE) {
      if (compressible) {
        writeRandomText(out, payload);
      } else {
        writeRandom(out, payload);
      }
    } else if (payload > 0) {
      //TODO What is compressible is turned on? LOG is a bad idea!
      out.write(literal, 0, payload);
    }
  }

  @Override
  public int compareTo(GridmixRecord other) {
    return compareSeed(other.seed,
        Math.max(0, other.getSize() - other.fixedBytes()));
  }

  int fixedBytes() {
    // min vint size
    return FIXED_BYTES;
  }

  private static long maskSeed(long sd, int sz) {
    // Don't use fixedBytes here; subclasses will set intended random len
    if (sz <= FIXED_BYTES) {
      sd = 0L;
    } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) {
      final int tmp = sz - FIXED_BYTES;
      final long mask = (1L << (Byte.SIZE * tmp)) - 1;
      sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp));
    }
    return sd;
  }

  int compareSeed(long jSeed, int jSize) {
    final int iSize = Math.max(0, getSize() - fixedBytes());
    final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES;
    jSeed = maskSeed(jSeed, seedLen);
    long iSeed = maskSeed(seed, seedLen);
    final int cmplen = Math.min(iSize, jSize);
    for (int i = 0; i < cmplen; i += Byte.SIZE) {
      final int k = cmplen - i;
      for (long j = Long.SIZE - Byte.SIZE;
          j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE;
          j -= Byte.SIZE) {
        final int xi = (int)((iSeed >>> j) & 0xFFL);
        final int xj = (int)((jSeed >>> j) & 0xFFL);
        if (xi != xj) {
          return xi - xj;
        }
      }
      iSeed = nextRand(iSeed);
      jSeed = nextRand(jSeed);
    }
    return iSize - jSize;
  }

  @Override
  public boolean equals(Object other) {
    if (this == other) {
      return true;
    }
    if (other != null && other.getClass() == getClass()) {
      final GridmixRecord o = ((GridmixRecord)other);
      return getSize() == o.getSize() && seed == o.seed;
    }
    return false;
  }

  @Override
  public int hashCode() {
    return (int)(seed * getSize());
  }

  public static class Comparator extends WritableComparator {

    public Comparator() {
      super(GridmixRecord.class);
    }

    public Comparator(Class<? extends WritableComparable<?>> sub) {
      super(sub);
    }

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
      n1 -= WritableUtils.getVIntSize(n1);
      n2 -= WritableUtils.getVIntSize(n2);
      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
    }

    static {
      WritableComparator.define(GridmixRecord.class, new Comparator());
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateData 源码

hadoop GenerateDistCacheData 源码

0  赞