hadoop WordMedian 源码

  • 2022-10-20
  • 浏览 (188)

haddop WordMedian 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;

public class WordMedian extends Configured implements Tool {

  private double median = 0;
  private final static IntWritable ONE = new IntWritable(1);

  /**
   * Maps words from line of text into a key-value pair; the length of the word
   * as the key, and 1 as the value.
   */
  public static class WordMedianMapper extends
      Mapper<Object, Text, IntWritable, IntWritable> {

    private IntWritable length = new IntWritable();

    /**
     * Emits a key-value pair for counting the word. Outputs are (IntWritable,
     * IntWritable).
     * 
     * @param value
     *          This will be a line of text coming in from our input file.
     */
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        String string = itr.nextToken();
        length.set(string.length());
        context.write(length, ONE);
      }
    }
  }

  /**
   * Performs integer summation of all the values for each key.
   */
  public static class WordMedianReducer extends
      Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private IntWritable val = new IntWritable();

    /**
     * Sums all the individual values within the iterator and writes them to the
     * same key.
     * 
     * @param key
     *          This will be a length of a word that was read.
     * @param values
     *          This will be an iterator of all the values associated with that
     *          key.
     */
    public void reduce(IntWritable key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {

      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      val.set(sum);
      context.write(key, val);
    }
  }

  /**
   * This is a standard program to read and find a median value based on a file
   * of word counts such as: 1 456, 2 132, 3 56... Where the first values are
   * the word lengths and the following values are the number of times that
   * words of that length appear.
   * 
   * @param path
   *          The path to read the HDFS file from (part-r-00000...00001...etc).
   * @param medianIndex1
   *          The first length value to look for.
   * @param medianIndex2
   *          The second length value to look for (will be the same as the first
   *          if there are an even number of words total).
   * @throws IOException
   *           If file cannot be found, we throw an exception.
   * */
  private double readAndFindMedian(String path, int medianIndex1,
      int medianIndex2, Configuration conf) throws IOException {
    FileSystem fs = FileSystem.get(conf);
    Path file = new Path(path, "part-r-00000");

    if (!fs.exists(file))
      throw new IOException("Output not found!");

    BufferedReader br = null;

    try {
      br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));
      int num = 0;

      String line;
      while ((line = br.readLine()) != null) {
        StringTokenizer st = new StringTokenizer(line);

        // grab length
        String currLen = st.nextToken();

        // grab count
        String lengthFreq = st.nextToken();

        int prevNum = num;
        num += Integer.parseInt(lengthFreq);

        if (medianIndex2 >= prevNum && medianIndex1 <= num) {
          System.out.println("The median is: " + currLen);
          br.close();
          return Double.parseDouble(currLen);
        } else if (medianIndex2 >= prevNum && medianIndex1 < num) {
          String nextCurrLen = st.nextToken();
          double theMedian = (Integer.parseInt(currLen) + Integer
              .parseInt(nextCurrLen)) / 2.0;
          System.out.println("The median is: " + theMedian);
          br.close();
          return theMedian;
        }
      }
    } finally {
      if (br != null) {
        br.close();
      }
    }
    // error, no median found
    return -1;
  }

  public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new WordMedian(), args);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs =
        new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordmedian <in> <out>");
      return 0;
    }

    setConf(conf);

    Job job = Job.getInstance(conf, "word median");
    job.setJarByClass(WordMedian.class);
    job.setMapperClass(WordMedianMapper.class);
    job.setCombinerClass(WordMedianReducer.class);
    job.setReducerClass(WordMedianReducer.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    boolean result = job.waitForCompletion(true);

    // Wait for JOB 1 -- get middle value to check for Median

    long totalWords = job.getCounters()
        .getGroup(TaskCounter.class.getCanonicalName())
        .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
    int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
    int medianIndex2 = (int) Math.floor((totalWords / 2.0));

    median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);

    return (result ? 0 : 1);
  }

  public double getMedian() {
    return median;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AggregateWordCount 源码

hadoop AggregateWordHistogram 源码

hadoop BaileyBorweinPlouffe 源码

hadoop DBCountPageView 源码

hadoop ExampleDriver 源码

hadoop Grep 源码

hadoop Join 源码

hadoop MultiFileWordCount 源码

hadoop QuasiMonteCarlo 源码

hadoop RandomTextWriter 源码

0  赞