hadoop LoggedJob 源码

  • 2022-10-20
  • 浏览 (212)

haddop LoggedJob 代码

文件路径:/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/LoggedJob.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/**
 * 
 */
package org.apache.hadoop.tools.rumen;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;

import com.fasterxml.jackson.annotation.JsonAnySetter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.rumen.datatypes.*;

/**
 * A {@link LoggedDiscreteCDF} is a representation of an hadoop job, with the
 * details of this class set up to meet the requirements of the Jackson JSON
 * parser/generator.
 * 
 * All of the public methods are simply accessors for the instance variables we
 * want to write out in the JSON files.
 * 
 */
public class LoggedJob implements DeepCompare {
  public enum JobType {
    JAVA, PIG, STREAMING, PIPES, OVERALL
  };

  public enum JobPriority {
    VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH
  };

  static private Set<String> alreadySeenAnySetterAttributes =
      new TreeSet<String>();

  JobID jobID;
  UserName user;
  long computonsPerMapInputByte = -1L;
  long computonsPerMapOutputByte = -1L;
  long computonsPerReduceInputByte = -1L;
  long computonsPerReduceOutputByte = -1L;
  long submitTime = -1L;
  long launchTime = -1L;
  long finishTime = -1L;

  int heapMegabytes = -1;
  int totalMaps = -1;
  int totalReduces = -1;
  Pre21JobHistoryConstants.Values outcome = null;
  JobType jobtype = JobType.JAVA;
  JobPriority priority = JobPriority.NORMAL;

  List<String> directDependantJobs = new ArrayList<String>();
  List<LoggedTask> mapTasks = new ArrayList<LoggedTask>();
  List<LoggedTask> reduceTasks = new ArrayList<LoggedTask>();
  List<LoggedTask> otherTasks = new ArrayList<LoggedTask>();

  // There are CDFs for each level of locality -- most local first
  ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs;
  // There are CDFs for each level of locality -- most local first
  ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs;

  LoggedDiscreteCDF successfulReduceAttemptCDF;
  LoggedDiscreteCDF failedReduceAttemptCDF;

  QueueName queue = null;

  JobName jobName = null;

  int clusterMapMB = -1;
  int clusterReduceMB = -1;
  int jobMapMB = -1;
  int jobReduceMB = -1;

  long relativeTime = 0;

  double[] mapperTriesToSucceed;
  double failedMapperFraction; // !!!!!

  private JobProperties jobProperties = new JobProperties();
  
  LoggedJob() {

  }

  LoggedJob(String jobID) {
    super();

    setJobID(jobID);
  }

  /**
   * Set the configuration properties of the job.
   */
  void setJobProperties(Properties conf) {
    this.jobProperties = new JobProperties(conf);
  }
  
  /**
   * Get the configuration properties of the job.
   */
  public JobProperties getJobProperties() {
    return jobProperties;
  }
  
  void adjustTimes(long adjustment) {
    submitTime += adjustment;
    launchTime += adjustment;
    finishTime += adjustment;

    for (LoggedTask task : mapTasks) {
      task.adjustTimes(adjustment);
    }

    for (LoggedTask task : reduceTasks) {
      task.adjustTimes(adjustment);
    }

    for (LoggedTask task : otherTasks) {
      task.adjustTimes(adjustment);
    }
  }

  // for input parameter ignored.
  @JsonAnySetter
  public void setUnknownAttribute(String attributeName, Object ignored) {
    if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
      alreadySeenAnySetterAttributes.add(attributeName);
      System.err.println("In LoggedJob, we saw the unknown attribute "
          + attributeName + ".");
    }
  }

  public UserName getUser() {
    return user;
  }

  void setUser(String user) {
    this.user = new UserName(user);
  }

  public JobID getJobID() {
    return jobID;
  }

  void setJobID(String jobID) {
    this.jobID = JobID.forName(jobID);
  }

  public JobPriority getPriority() {
    return priority;
  }

  void setPriority(JobPriority priority) {
    this.priority = priority;
  }

  public long getComputonsPerMapInputByte() {
    return computonsPerMapInputByte;
  }

  void setComputonsPerMapInputByte(long computonsPerMapInputByte) {
    this.computonsPerMapInputByte = computonsPerMapInputByte;
  }

  public long getComputonsPerMapOutputByte() {
    return computonsPerMapOutputByte;
  }

  void setComputonsPerMapOutputByte(long computonsPerMapOutputByte) {
    this.computonsPerMapOutputByte = computonsPerMapOutputByte;
  }

  public long getComputonsPerReduceInputByte() {
    return computonsPerReduceInputByte;
  }

  void setComputonsPerReduceInputByte(long computonsPerReduceInputByte) {
    this.computonsPerReduceInputByte = computonsPerReduceInputByte;
  }

  public long getComputonsPerReduceOutputByte() {
    return computonsPerReduceOutputByte;
  }

  void setComputonsPerReduceOutputByte(long computonsPerReduceOutputByte) {
    this.computonsPerReduceOutputByte = computonsPerReduceOutputByte; // !!!!!
  }

  public long getSubmitTime() {
    return submitTime;
  }

  void setSubmitTime(long submitTime) {
    this.submitTime = submitTime;
  }

  public long getLaunchTime() {
    return launchTime;
  }

  void setLaunchTime(long startTime) {
    this.launchTime = startTime;
  }

  public long getFinishTime() {
    return finishTime;
  }

  void setFinishTime(long finishTime) {
    this.finishTime = finishTime;
  }

  public int getHeapMegabytes() {
    return heapMegabytes;
  }

  void setHeapMegabytes(int heapMegabytes) {
    this.heapMegabytes = heapMegabytes;
  }

  public int getTotalMaps() {
    return totalMaps;
  }

  void setTotalMaps(int totalMaps) {
    this.totalMaps = totalMaps;
  }

  public int getTotalReduces() {
    return totalReduces;
  }

  void setTotalReduces(int totalReduces) {
    this.totalReduces = totalReduces;
  }

  public Pre21JobHistoryConstants.Values getOutcome() {
    return outcome;
  }

  void setOutcome(Pre21JobHistoryConstants.Values outcome) {
    this.outcome = outcome;
  }

  public JobType getJobtype() {
    return jobtype;
  }

  void setJobtype(JobType jobtype) {
    this.jobtype = jobtype;
  }

  public List<String> getDirectDependantJobs() {
    return directDependantJobs;
  }

  void setDirectDependantJobs(List<String> directDependantJobs) {
    this.directDependantJobs = directDependantJobs;
  }

  public List<LoggedTask> getMapTasks() {
    return mapTasks;
  }

  void setMapTasks(List<LoggedTask> mapTasks) {
    this.mapTasks = mapTasks;
  }

  public List<LoggedTask> getReduceTasks() {
    return reduceTasks;
  }

  void setReduceTasks(List<LoggedTask> reduceTasks) {
    this.reduceTasks = reduceTasks;
  }

  public List<LoggedTask> getOtherTasks() {
    return otherTasks;
  }

  void setOtherTasks(List<LoggedTask> otherTasks) {
    this.otherTasks = otherTasks;
  }

  public ArrayList<LoggedDiscreteCDF> getSuccessfulMapAttemptCDFs() {
    return successfulMapAttemptCDFs;
  }

  void setSuccessfulMapAttemptCDFs(
      ArrayList<LoggedDiscreteCDF> successfulMapAttemptCDFs) {
    this.successfulMapAttemptCDFs = successfulMapAttemptCDFs;
  }

  public ArrayList<LoggedDiscreteCDF> getFailedMapAttemptCDFs() {
    return failedMapAttemptCDFs;
  }

  void setFailedMapAttemptCDFs(ArrayList<LoggedDiscreteCDF> failedMapAttemptCDFs) {
    this.failedMapAttemptCDFs = failedMapAttemptCDFs;
  }

  public LoggedDiscreteCDF getSuccessfulReduceAttemptCDF() {
    return successfulReduceAttemptCDF;
  }

  void setSuccessfulReduceAttemptCDF(
      LoggedDiscreteCDF successfulReduceAttemptCDF) {
    this.successfulReduceAttemptCDF = successfulReduceAttemptCDF;
  }

  public LoggedDiscreteCDF getFailedReduceAttemptCDF() {
    return failedReduceAttemptCDF;
  }

  void setFailedReduceAttemptCDF(LoggedDiscreteCDF failedReduceAttemptCDF) {
    this.failedReduceAttemptCDF = failedReduceAttemptCDF;
  }

  public double[] getMapperTriesToSucceed() {
    return mapperTriesToSucceed;
  }

  void setMapperTriesToSucceed(double[] mapperTriesToSucceed) {
    this.mapperTriesToSucceed = mapperTriesToSucceed;
  }

  public double getFailedMapperFraction() {
    return failedMapperFraction;
  }

  void setFailedMapperFraction(double failedMapperFraction) {
    this.failedMapperFraction = failedMapperFraction;
  }

  public long getRelativeTime() {
    return relativeTime;
  }

  void setRelativeTime(long relativeTime) {
    this.relativeTime = relativeTime;
  }

  /**
   * @return job queue name if it is available in job history file or
   *         job history conf file. Returns null otherwise.
   */
  public QueueName getQueue() {
    return queue;
  }

  void setQueue(String queue) {
    this.queue = new QueueName(queue);
  }

  public JobName getJobName() {
    return jobName;
  }

  void setJobName(String jobName) {
    this.jobName = new JobName(jobName);
  }

  public int getClusterMapMB() {
    return clusterMapMB;
  }

  void setClusterMapMB(int clusterMapMB) {
    this.clusterMapMB = clusterMapMB;
  }

  public int getClusterReduceMB() {
    return clusterReduceMB;
  }

  void setClusterReduceMB(int clusterReduceMB) {
    this.clusterReduceMB = clusterReduceMB;
  }

  public int getJobMapMB() {
    return jobMapMB;
  }

  void setJobMapMB(int jobMapMB) {
    this.jobMapMB = jobMapMB;
  }

  public int getJobReduceMB() {
    return jobReduceMB;
  }

  void setJobReduceMB(int jobReduceMB) {
    this.jobReduceMB = jobReduceMB;
  }

  private void compare1(String c1, String c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    if (c1 == null || c2 == null || !c1.equals(c2)) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(long c1, long c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(Pre21JobHistoryConstants.Values c1,
      Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(JobType c1, JobType c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(JobPriority c1, JobPriority c2, TreePath loc,
      String eltname) throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(int c1, int c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(double c1, double c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 != c2) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }
  }

  private void compare1(double[] c1, double[] c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    TreePath recursePath = new TreePath(loc, eltname);

    if (c1 == null || c2 == null || c1.length != c2.length) {
      throw new DeepInequalityException(eltname + " miscompared", recursePath);
    }

    for (int i = 0; i < c1.length; ++i) {
      if (c1[i] != c2[i]) {
        throw new DeepInequalityException(eltname + " miscompared",
            new TreePath(loc, eltname, i));
      }
    }
  }

  private void compare1(DeepCompare c1, DeepCompare c2, TreePath loc,
      String eltname, int index) throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    TreePath recursePath = new TreePath(loc, eltname, index);

    if (c1 == null || c2 == null) {
      if (index == -1) {
        throw new DeepInequalityException(eltname + " miscompared", recursePath);
      } else {
        throw new DeepInequalityException(eltname + "[" + index
            + "] miscompared", recursePath);
      }
    }

    c1.deepCompare(c2, recursePath);
  }

  // I'll treat this as an atomic object type
  private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
      String eltname) throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    TreePath recursePath = new TreePath(loc, eltname);

    if (c1 == null || c2 == null || !c1.equals(c2)) {
      throw new DeepInequalityException(eltname + " miscompared", recursePath);
    }
  }

  private void compareLoggedTasks(List<LoggedTask> c1, List<LoggedTask> c2,
      TreePath loc, String eltname) throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    if (c1 == null || c2 == null || c1.size() != c2.size()) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }

    for (int i = 0; i < c1.size(); ++i) {
      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
    }
  }

  private void compareCDFs(List<LoggedDiscreteCDF> c1,
      List<LoggedDiscreteCDF> c2, TreePath loc, String eltname)
      throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    if (c1 == null || c2 == null || c1.size() != c2.size()) {
      throw new DeepInequalityException(eltname + " miscompared", new TreePath(
          loc, eltname));
    }

    for (int i = 0; i < c1.size(); ++i) {
      c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
    }
  }

  private void compareJobProperties(JobProperties jprop1, JobProperties jprop2,
                                    TreePath loc, String eltname) 
  throws DeepInequalityException {
    if (jprop1 == null && jprop2 == null) {
      return;
    }

    if (jprop1 == null || jprop2 == null) {
      throw new DeepInequalityException(eltname + " miscompared", 
                                        new TreePath(loc, eltname));
    }

    Properties prop1 = jprop1.getValue();
    Properties prop2 = jprop2.getValue();
    
    if (prop1.size() != prop2.size()) {
      throw new DeepInequalityException(eltname + " miscompared [size]", 
                                        new TreePath(loc, eltname));
    }
    
    for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
      String v1 = entry.getValue().toString();
      String v2 = prop2.get(entry.getKey()).toString();
      compare1(v1, v2, new TreePath(loc, eltname), "key:" + entry.getKey());
    }
  }
  
  private void compare1(DataType<String> c1, DataType<String> c2, TreePath loc, 
                        String eltname) 
  throws DeepInequalityException {
    if (c1 == null && c2 == null) {
      return;
    }

    if (c1 == null || c2 == null) {
      throw new DeepInequalityException(eltname + " miscompared", 
                                        new TreePath(loc, eltname));
    }
    TreePath dtPath = new TreePath(loc, eltname);
    
    if (!c1.getClass().getName().equals(c2.getClass().getName())) {
      throw new DeepInequalityException(eltname + " miscompared", 
                                        new TreePath(dtPath, "class"));
    }
    
    compare1(c1.getValue(), c2.getValue(), dtPath, "value");
  }
  
  public void deepCompare(DeepCompare comparand, TreePath loc)
      throws DeepInequalityException {
    if (!(comparand instanceof LoggedJob)) {
      throw new DeepInequalityException("comparand has wrong type", loc);
    }

    LoggedJob other = (LoggedJob) comparand;

    compare1(jobID.toString(), other.jobID.toString(), loc, "jobID");
    compare1(user, other.user, loc, "user");

    compare1(computonsPerMapInputByte, other.computonsPerMapInputByte, loc,
        "computonsPerMapInputByte");
    compare1(computonsPerMapOutputByte, other.computonsPerMapOutputByte, loc,
        "computonsPerMapOutputByte");
    compare1(computonsPerReduceInputByte, other.computonsPerReduceInputByte,
        loc, "computonsPerReduceInputByte");
    compare1(computonsPerReduceOutputByte, other.computonsPerReduceOutputByte,
        loc, "computonsPerReduceOutputByte");

    compare1(submitTime, other.submitTime, loc, "submitTime");
    compare1(launchTime, other.launchTime, loc, "launchTime");
    compare1(finishTime, other.finishTime, loc, "finishTime");

    compare1(heapMegabytes, other.heapMegabytes, loc, "heapMegabytes");

    compare1(totalMaps, other.totalMaps, loc, "totalMaps");
    compare1(totalReduces, other.totalReduces, loc, "totalReduces");

    compare1(outcome, other.outcome, loc, "outcome");
    compare1(jobtype, other.jobtype, loc, "jobtype");
    compare1(priority, other.priority, loc, "priority");

    compareStrings(directDependantJobs, other.directDependantJobs, loc,
        "directDependantJobs");

    compareLoggedTasks(mapTasks, other.mapTasks, loc, "mapTasks");
    compareLoggedTasks(reduceTasks, other.reduceTasks, loc, "reduceTasks");
    compareLoggedTasks(otherTasks, other.otherTasks, loc, "otherTasks");

    compare1(relativeTime, other.relativeTime, loc, "relativeTime");

    compareCDFs(successfulMapAttemptCDFs, other.successfulMapAttemptCDFs, loc,
        "successfulMapAttemptCDFs");
    compareCDFs(failedMapAttemptCDFs, other.failedMapAttemptCDFs, loc,
        "failedMapAttemptCDFs");
    compare1(successfulReduceAttemptCDF, other.successfulReduceAttemptCDF, loc,
        "successfulReduceAttemptCDF", -1);
    compare1(failedReduceAttemptCDF, other.failedReduceAttemptCDF, loc,
        "failedReduceAttemptCDF", -1);

    compare1(mapperTriesToSucceed, other.mapperTriesToSucceed, loc,
        "mapperTriesToSucceed");
    compare1(failedMapperFraction, other.failedMapperFraction, loc,
        "failedMapperFraction");

    compare1(queue, other.queue, loc, "queue");
    compare1(jobName, other.jobName, loc, "jobName");

    compare1(clusterMapMB, other.clusterMapMB, loc, "clusterMapMB");
    compare1(clusterReduceMB, other.clusterReduceMB, loc, "clusterReduceMB");
    compare1(jobMapMB, other.jobMapMB, loc, "jobMapMB");
    compare1(jobReduceMB, other.jobReduceMB, loc, "jobReduceMB");

    // compare the job configuration parameters
    compareJobProperties(jobProperties, other.getJobProperties(), loc, 
                         "JobProperties");
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractClusterStory 源码

hadoop Anonymizer 源码

hadoop CDFPiecewiseLinearRandomGenerator 源码

hadoop CDFRandomGenerator 源码

hadoop ClusterStory 源码

hadoop ClusterTopologyReader 源码

hadoop CurrentJHParser 源码

hadoop DeepCompare 源码

hadoop DeepInequalityException 源码

hadoop DefaultInputDemuxer 源码

0  赞