hadoop ExecutionSummarizer 源码

  • 2022-10-20
  • 浏览 (177)

haddop ExecutionSummarizer 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;

import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;

/**
 * Summarizes a {@link Gridmix} run. Statistics that are reported are
 * <ul>
 *   <li>Total number of jobs in the input trace</li>
 *   <li>Trace signature</li>
 *   <li>Total number of jobs processed from the input trace</li>
 *   <li>Total number of jobs submitted</li>
 *   <li>Total number of successful and failed jobs</li>
 *   <li>Total number of map/reduce tasks launched</li>
 *   <li>Gridmix start & end time</li>
 *   <li>Total time for the Gridmix run (data-generation and simulation)</li>
 *   <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li>
 * </ul>
 */
class ExecutionSummarizer implements StatListener<JobStats> {
  static final Logger LOG = LoggerFactory.getLogger(ExecutionSummarizer.class);
  private static final FastDateFormat UTIL = FastDateFormat.getInstance();
  
  private int numJobsInInputTrace;
  private int totalSuccessfulJobs;
  private int totalFailedJobs;
  private int totalLostJobs;
  private int totalMapTasksLaunched;
  private int totalReduceTasksLaunched;
  private long totalSimulationTime;
  private long totalRuntime;
  private final String commandLineArgs;
  private long startTime;
  private long endTime;
  private long simulationStartTime;
  private String inputTraceLocation;
  private String inputTraceSignature;
  private String jobSubmissionPolicy;
  private String resolver;
  private DataStatistics dataStats;
  private String expectedDataSize;
  
  /**
   * Basic constructor initialized with the runtime arguments. 
   */
  ExecutionSummarizer(String[] args) {
    startTime = System.currentTimeMillis();
    // flatten the args string and store it
    commandLineArgs = 
      org.apache.commons.lang3.StringUtils.join(args, ' ');
  }
  
  /**
   * Default constructor. 
   */
  ExecutionSummarizer() {
    startTime = System.currentTimeMillis();
    commandLineArgs = Summarizer.NA; 
  }
  
  void start(Configuration conf) {
    simulationStartTime = System.currentTimeMillis();
  }
  
  private void processJobState(JobStats stats) {
    Job job = stats.getJob();
    try {
      if (job.isSuccessful()) {
        ++totalSuccessfulJobs;
      } else {
        ++totalFailedJobs;
      }
    } catch (Exception e) {
      // this behavior is consistent with job-monitor which marks the job as 
      // complete (lost) if the status polling bails out
      ++totalLostJobs; 
    }
  }
  
  private void processJobTasks(JobStats stats) {
    totalMapTasksLaunched += stats.getNoOfMaps();
    totalReduceTasksLaunched += stats.getNoOfReds();
  }
  
  private void process(JobStats stats) {
    // process the job run state
    processJobState(stats);

    // process the tasks information
    processJobTasks(stats);
  }
  
  @Override
  public void update(JobStats item) {
    // process only if the simulation has started
    if (simulationStartTime > 0) {
      process(item);
      totalSimulationTime = 
        System.currentTimeMillis() - getSimulationStartTime();
    }
  }
  
  // Generates a signature for the trace file based on
  //   - filename
  //   - modification time
  //   - file length
  //   - owner
  protected static String getTraceSignature(String input) throws IOException {
    Path inputPath = new Path(input);
    FileSystem fs = inputPath.getFileSystem(new Configuration());
    FileStatus status = fs.getFileStatus(inputPath);
    Path qPath = fs.makeQualified(status.getPath());
    String traceID = status.getModificationTime() + qPath.toString()
                     + status.getOwner() + status.getLen();
    return MD5Hash.digest(traceID).toString();
  }
  
  @SuppressWarnings("unchecked")
  void finalize(JobFactory factory, String inputPath, long dataSize, 
                UserResolver userResolver, DataStatistics stats,
                Configuration conf) 
  throws IOException {
    numJobsInInputTrace = factory.numJobsInTrace;
    endTime = System.currentTimeMillis();
     if ("-".equals(inputPath)) {
      inputTraceLocation = Summarizer.NA;
      inputTraceSignature = Summarizer.NA;
    } else {
      Path inputTracePath = new Path(inputPath);
      FileSystem fs = inputTracePath.getFileSystem(conf);
      inputTraceLocation = fs.makeQualified(inputTracePath).toString();
      inputTraceSignature = getTraceSignature(inputPath);
    }
    jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
    resolver = userResolver.getClass().getName();
    if (dataSize > 0) {
      expectedDataSize = StringUtils.humanReadableInt(dataSize);
    } else {
      expectedDataSize = Summarizer.NA;
    }
    dataStats = stats;
    totalRuntime = System.currentTimeMillis() - getStartTime();
  }
  
  /**
   * Summarizes the current {@link Gridmix} run.
   */
  @Override
  public String toString() {
    StringBuilder builder = new StringBuilder();
    builder.append("Execution Summary:-");
    builder.append("\nInput trace: ").append(getInputTraceLocation());
    builder.append("\nInput trace signature: ")
           .append(getInputTraceSignature());
    builder.append("\nTotal number of jobs in trace: ")
           .append(getNumJobsInTrace());
    builder.append("\nExpected input data size: ")
           .append(getExpectedDataSize());
    builder.append("\nInput data statistics: ")
           .append(getInputDataStatistics());
    builder.append("\nTotal number of jobs processed: ")
           .append(getNumSubmittedJobs());
    builder.append("\nTotal number of successful jobs: ")
           .append(getNumSuccessfulJobs());
    builder.append("\nTotal number of failed jobs: ")
           .append(getNumFailedJobs());
    builder.append("\nTotal number of lost jobs: ")
           .append(getNumLostJobs());
    builder.append("\nTotal number of map tasks launched: ")
           .append(getNumMapTasksLaunched());
    builder.append("\nTotal number of reduce task launched: ")
           .append(getNumReduceTasksLaunched());
    builder.append("\nGridmix start time: ")
           .append(UTIL.format(getStartTime()));
    builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime()));
    builder.append("\nGridmix simulation start time: ")
           .append(UTIL.format(getStartTime()));
    builder.append("\nGridmix runtime: ")
           .append(StringUtils.formatTime(getRuntime()));
    builder.append("\nTime spent in initialization (data-gen etc): ")
           .append(StringUtils.formatTime(getInitTime()));
    builder.append("\nTime spent in simulation: ")
           .append(StringUtils.formatTime(getSimulationTime()));
    builder.append("\nGridmix configuration parameters: ")
           .append(getCommandLineArgsString());
    builder.append("\nGridmix job submission policy: ")
           .append(getJobSubmissionPolicy());
    builder.append("\nGridmix resolver: ").append(getUserResolver());
    builder.append("\n\n");
    return builder.toString();
  }
  
  // Gets the stringified version of DataStatistics
  static String stringifyDataStatistics(DataStatistics stats) {
    if (stats != null) {
      StringBuffer buffer = new StringBuffer();
      String compressionStatus = stats.isDataCompressed() 
                                 ? "Compressed" 
                                 : "Uncompressed";
      buffer.append(compressionStatus).append(" input data size: ");
      buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
      buffer.append(", ");
      buffer.append("Number of files: ").append(stats.getNumFiles());

      return buffer.toString();
    } else {
      return Summarizer.NA;
    }
  }
  
  // Getters
  protected String getExpectedDataSize() {
    return expectedDataSize;
  }
  
  protected String getUserResolver() {
    return resolver;
  }
  
  protected String getInputDataStatistics() {
    return stringifyDataStatistics(dataStats);
  }
  
  protected String getInputTraceSignature() {
    return inputTraceSignature;
  }
  
  protected String getInputTraceLocation() {
    return inputTraceLocation;
  }
  
  protected int getNumJobsInTrace() {
    return numJobsInInputTrace;
  }
  
  protected int getNumSuccessfulJobs() {
    return totalSuccessfulJobs;
  }
  
  protected int getNumFailedJobs() {
    return totalFailedJobs;
  }
  
  protected int getNumLostJobs() {
    return totalLostJobs;
  }
  
  protected int getNumSubmittedJobs() {
    return totalSuccessfulJobs + totalFailedJobs + totalLostJobs;
  }
  
  protected int getNumMapTasksLaunched() {
    return totalMapTasksLaunched;
  }
  
  protected int getNumReduceTasksLaunched() {
    return totalReduceTasksLaunched;
  }
  
  protected long getStartTime() {
    return startTime;
  }
  
  protected long getEndTime() {
    return endTime;
  }
  
  protected long getInitTime() {
    return simulationStartTime - startTime;
  }
  
  protected long getSimulationStartTime() {
    return simulationStartTime;
  }
  
  protected long getSimulationTime() {
    return totalSimulationTime;
  }
  
  protected long getRuntime() {
    return totalRuntime;
  }
  
  protected String getCommandLineArgsString() {
    return commandLineArgs;
  }
  
  protected String getJobSubmissionPolicy() {
    return jobSubmissionPolicy;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateData 源码

hadoop GenerateDistCacheData 源码

hadoop Gridmix 源码

0  赞