hadoop StressJobFactory 源码

  • 2022-10-20
  • 浏览 (207)

haddop StressJobFactory 代码

文件路径:/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
  public static final Logger LOG = LoggerFactory.getLogger(StressJobFactory.class);

  private final LoadStatus loadStatus = new LoadStatus();
  /**
   * The minimum ratio between pending+running map tasks (aka. incomplete map
   * tasks) and cluster map slot capacity for us to consider the cluster is
   * overloaded. For running maps, we only count them partially. Namely, a 40%
   * completed map is counted as 0.6 map tasks in our calculation.
   */
  private static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
  public static final String CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO=
      "gridmix.throttle.maps.task-to-slot-ratio";
  final float overloadMapTaskMapSlotRatio;

  /**
   * The minimum ratio between pending+running reduce tasks (aka. incomplete
   * reduce tasks) and cluster reduce slot capacity for us to consider the
   * cluster is overloaded. For running reduces, we only count them partially.
   * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
   * calculation.
   */
  private static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
  public static final String CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO=
    "gridmix.throttle.reduces.task-to-slot-ratio";
  final float overloadReduceTaskReduceSlotRatio;

  /**
   * The maximum share of the cluster's mapslot capacity that can be counted
   * toward a job's incomplete map tasks in overload calculation.
   */
  private static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
  public static final String CONF_MAX_MAPSLOT_SHARE_PER_JOB=
    "gridmix.throttle.maps.max-slot-share-per-job";  
  final float maxMapSlotSharePerJob;
  
  /**
   * The maximum share of the cluster's reduceslot capacity that can be counted
   * toward a job's incomplete reduce tasks in overload calculation.
   */
  private static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
  public static final String CONF_MAX_REDUCESLOT_SHARE_PER_JOB=
    "gridmix.throttle.reducess.max-slot-share-per-job";  
  final float maxReduceSlotSharePerJob;

  /**
   * The ratio of the maximum number of pending+running jobs over the number of
   * task trackers.
   */
  private static final float MAX_JOB_TRACKER_RATIO=1.0f;
  public static final String CONF_MAX_JOB_TRACKER_RATIO=
    "gridmix.throttle.jobs-to-tracker-ratio";  
  final float maxJobTrackerRatio;

  /**
   * Represents a list of blacklisted jobs. Jobs are blacklisted when either 
   * they are complete or their status cannot be obtained. Stress mode will 
   * ignore blacklisted jobs from its overload computation.
   */
  private Set<JobID> blacklistedJobs = new HashSet<JobID>();
  
  /**
   * Creating a new instance does not start the thread.
   *
   * @param submitter   Component to which deserialized jobs are passed
   * @param jobProducer Stream of job traces with which to construct a
   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
   * @param scratch     Directory into which to write output from simulated jobs
   * @param conf        Config passed to all jobs to be submitted
   * @param startFlag   Latch released from main to start pipeline
   * @throws java.io.IOException
   */
  public StressJobFactory(
    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
    throws IOException {
    super(
      submitter, jobProducer, scratch, conf, startFlag, resolver);
    overloadMapTaskMapSlotRatio = conf.getFloat(
        CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO, OVERLOAD_MAPTASK_MAPSLOT_RATIO);
    overloadReduceTaskReduceSlotRatio = conf.getFloat(
        CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO, 
        OVERLOAD_REDUCETASK_REDUCESLOT_RATIO);
    maxMapSlotSharePerJob = conf.getFloat(
        CONF_MAX_MAPSLOT_SHARE_PER_JOB, MAX_MAPSLOT_SHARE_PER_JOB);
    maxReduceSlotSharePerJob = conf.getFloat(
        CONF_MAX_REDUCESLOT_SHARE_PER_JOB, MAX_REDUCESLOT_SHARE_PER_JOB);
    maxJobTrackerRatio = conf.getFloat(
        CONF_MAX_JOB_TRACKER_RATIO, MAX_JOB_TRACKER_RATIO);
  }

  public Thread createReaderThread() {
    return new StressReaderThread("StressJobFactory");
  }

  /*
  * Worker thread responsible for reading descriptions, assigning sequence
  * numbers, and normalizing time.
  */
  private class StressReaderThread extends Thread {

    public StressReaderThread(String name) {
      super(name);
    }

    /**
     * STRESS: Submits the job in STRESS mode.
     * while(JT is overloaded) {
     * wait();
     * }
     * If not overloaded , get number of slots available.
     * Keep submitting the jobs till ,total jobs  is sufficient to
     * load the JT.
     * That is submit  (Sigma(no of maps/Job)) > (2 * no of slots available)
     */
    public void run() {
      try {
        startFlag.await();
        if (Thread.currentThread().isInterrupted()) {
          LOG.warn("[STRESS] Interrupted before start!. Exiting..");
          return;
        }
        LOG.info("START STRESS @ " + System.currentTimeMillis());
        while (!Thread.currentThread().isInterrupted()) {
          try {
            while (loadStatus.overloaded()) {
              // update the overload status
              if (LOG.isDebugEnabled()) {
                LOG.debug("Updating the overload status.");
              }
              try {
                checkLoadAndGetSlotsToBackfill();
              } catch (IOException ioe) {
                LOG.warn("[STRESS] Check failed!", ioe);
                return;
              }
              
              // if the cluster is still overloaded, then sleep
              if (loadStatus.overloaded()) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("[STRESS] Cluster overloaded in run! Sleeping...");
                }

                // sleep 
                try {
                  Thread.sleep(1000);
                } catch (InterruptedException ie) {
                  LOG.warn("[STRESS] Interrupted while sleeping! Exiting.", ie);
                  return;
                }
              }
            }

            while (!loadStatus.overloaded()) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("[STRESS] Cluster underloaded in run! Stressing...");
              }
              try {
                //TODO This in-line read can block submission for large jobs.
                final JobStory job = getNextJobFiltered();
                if (null == job) {
                  LOG.warn("[STRESS] Finished consuming the input trace. " 
                           + "Exiting..");
                  return;
                }
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Job Selected: " + job.getJobID());
                }
                
                UserGroupInformation ugi = 
                  UserGroupInformation.createRemoteUser(job.getUser());
                UserGroupInformation tgtUgi = userResolver.getTargetUgi(ugi);
                GridmixJob tJob = 
                  jobCreator.createGridmixJob(conf, 0L, job, scratch, 
                               tgtUgi, sequence.getAndIncrement());
                
                // submit the job
                submitter.add(tJob);
                
                // TODO: We need to take care of scenario when one map/reduce
                // takes more than 1 slot.
                
                // Lock the loadjob as we are making updates
                int incompleteMapTasks = (int) calcEffectiveIncompleteMapTasks(
                                                 loadStatus.getMapCapacity(), 
                                                 job.getNumberMaps(), 0.0f);
                loadStatus.decrementMapLoad(incompleteMapTasks);
                
                int incompleteReduceTasks = 
                  (int) calcEffectiveIncompleteReduceTasks(
                          loadStatus.getReduceCapacity(), 
                          job.getNumberReduces(), 0.0f);
                loadStatus.decrementReduceLoad(incompleteReduceTasks);
                  
                loadStatus.decrementJobLoad(1);
              } catch (IOException e) {
                LOG.error("[STRESS] Error while submitting the job ", e);
                error = e;
                return;
              }

            }
          } finally {
            // do nothing
          }
        }
      } catch (InterruptedException e) {
        LOG.error("[STRESS] Interrupted in the main block!", e);
        return;
      } finally {
        IOUtils.cleanupWithLogger(null, jobProducer);
      }
    }
  }

  /**
   * STRESS Once you get the notification from StatsCollector.Collect the
   * clustermetrics. Update current loadStatus with new load status of JT.
   *
   * @param item
   */
  @Override
  public void update(Statistics.ClusterStats item) {
    ClusterStatus clusterStatus = item.getStatus();
    try {
      // update the max cluster map/reduce task capacity
      loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks());
      
      loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks());
      
      int numTrackers = clusterStatus.getTaskTrackers();
      int jobLoad = 
        (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob();
      loadStatus.updateJobLoad(jobLoad);
    } catch (Exception e) {
      LOG.error("Couldn't get the new Status",e);
    }
  }

  float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
      int numMaps, float mapProgress) {
    float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
        * maxMapSlotSharePerJob);
    float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
    return Math.min(maxEffIncompleteMapTasks, 
                    numMaps * (1.0f - mapProgressAdjusted));
  }

  float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
      int numReduces, float reduceProgress) {
    float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
        * maxReduceSlotSharePerJob);
    float reduceProgressAdjusted = 
      Math.max(Math.min(reduceProgress, 1.0f), 0.0f);
    return Math.min(maxEffIncompleteReduceTasks, 
                    numReduces * (1.0f - reduceProgressAdjusted));
  }

  /**
   * We try to use some light-weight mechanism to determine cluster load.
   *
   * @throws java.io.IOException
   */
  protected void checkLoadAndGetSlotsToBackfill() 
  throws IOException, InterruptedException {
    if (loadStatus.getJobLoad() <= 0) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
                  + Boolean.TRUE.toString() + " NumJobsBackfill is "
                  + loadStatus.getJobLoad());
      }
      return; // stop calculation because we know it is overloaded.
    }

    int mapCapacity = loadStatus.getMapCapacity();
    int reduceCapacity = loadStatus.getReduceCapacity();
    
    // return if the cluster status is not set
    if (mapCapacity < 0 || reduceCapacity < 0) {
      // note that, by default, the overload status is true
      // missing cluster status will result into blocking of job submission
      return;
    }
    
    // Determine the max permissible map & reduce task load
    int maxMapLoad = (int) (overloadMapTaskMapSlotRatio * mapCapacity);
    int maxReduceLoad = 
      (int) (overloadReduceTaskReduceSlotRatio * reduceCapacity);
    
    // compute the total number of map & reduce tasks submitted
    int totalMapTasks = ClusterStats.getSubmittedMapTasks();
    int totalReduceTasks = ClusterStats.getSubmittedReduceTasks();
    
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total submitted map tasks: " + totalMapTasks);
      LOG.debug("Total submitted reduce tasks: " + totalReduceTasks);
      LOG.debug("Max map load: " + maxMapLoad);
      LOG.debug("Max reduce load: " + maxReduceLoad);
    }
    
    // generate a pessimistic bound on the max running+pending map tasks
    // this check is to avoid the heavy-duty actual map load calculation
    int mapSlotsBackFill = (int) (maxMapLoad - totalMapTasks);
    
    // generate a pessimistic bound on the max running+pending reduce tasks
    // this check is to avoid the heavy-duty actual reduce load calculation
    int reduceSlotsBackFill = (int) (maxReduceLoad - totalReduceTasks);
    
    // maintain a list of seen job ids
    Set<JobID> seenJobIDs = new HashSet<JobID>();
    
    // check if the total number of submitted map/reduce tasks exceeds the 
    // permissible limit
    if (totalMapTasks > maxMapLoad || totalReduceTasks > maxReduceLoad) {
      // if yes, calculate the real load
      float incompleteMapTasks = 0; // include pending & running map tasks.
      float incompleteReduceTasks = 0; // include pending & running reduce tasks
      
      for (JobStats job : ClusterStats.getRunningJobStats()) {
        JobID id = job.getJob().getJobID();
        seenJobIDs.add(id);
        
        // Note that this is a hack! Ideally, ClusterStats.getRunningJobStats()
        // should be smart enough to take care of completed jobs.
        if (blacklistedJobs.contains(id)) {
          LOG.warn("Ignoring blacklisted job: " + id);
          continue;
        }
        
        int noOfMaps = job.getNoOfMaps();
        int noOfReduces = job.getNoOfReds();
        
        // consider polling for jobs where maps>0 and reds>0
        // TODO: What about setup/cleanup tasks for cases where m=0 and r=0
        //       What otherwise?
        if (noOfMaps > 0 || noOfReduces > 0) {
          // get the job's status
          JobStatus status = job.getJobStatus();
          
          // blacklist completed jobs and continue
          if (status != null && status.isJobComplete()) {
            LOG.warn("Blacklisting completed job: " + id);
            blacklistedJobs.add(id);
            continue;
          }
          
          // get the map and reduce tasks' progress
          float mapProgress = 0f;
          float reduceProgress = 0f;
          
          // check if the status is missing (this can happen for unpolled jobs)
          if (status != null) {
            mapProgress = status.getMapProgress();
            reduceProgress = status.getReduceProgress();
          }
          
          incompleteMapTasks += 
            calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);

          // bail out early
          int currentMapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
          if (currentMapSlotsBackFill <= 0) {
            // reset the reduce task load since we are bailing out
            incompleteReduceTasks = totalReduceTasks;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Terminating overload check due to high map load.");
            }
            break;
          }

          // compute the real reduce load
          if (noOfReduces > 0) {
            incompleteReduceTasks += 
              calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, 
                  reduceProgress);
          }

          // bail out early
          int currentReduceSlotsBackFill = 
            (int) (maxReduceLoad - incompleteReduceTasks);
          if (currentReduceSlotsBackFill <= 0) {
            // reset the map task load since we are bailing out
            incompleteMapTasks = totalMapTasks;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Terminating overload check due to high reduce load.");
            }
            break;
          }
        } else {
          LOG.warn("Blacklisting empty job: " + id);
          blacklistedJobs.add(id);
        }
      }

      // calculate the real map load on the cluster
      mapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
      
      // calculate the real reduce load on the cluster
      reduceSlotsBackFill = (int)(maxReduceLoad - incompleteReduceTasks);
      
      // clean up the backlisted set to keep the memory footprint minimal
      // retain only the jobs that are seen in this cycle
      blacklistedJobs.retainAll(seenJobIDs);
      if (LOG.isDebugEnabled() && blacklistedJobs.size() > 0) {
        LOG.debug("Blacklisted jobs count: " + blacklistedJobs.size());
      }
    }
    
    // update
    loadStatus.updateMapLoad(mapSlotsBackFill); 
    loadStatus.updateReduceLoad(reduceSlotsBackFill);
    
    if (loadStatus.getMapLoad() <= 0) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(System.currentTimeMillis() + " [MAP-LOAD] Overloaded is "
                  + Boolean.TRUE.toString() + " MapSlotsBackfill is "
                  + loadStatus.getMapLoad());
      }
      return; // stop calculation because we know it is overloaded.
    }
    
    if (loadStatus.getReduceLoad() <= 0) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
                  + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
                  + loadStatus.getReduceLoad());
      }
      return; // stop calculation because we know it is overloaded.
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug(System.currentTimeMillis() + " [OVERALL] Overloaded is "
                + Boolean.FALSE.toString() + "Current load Status is " 
                + loadStatus);
    }
  }

  static class LoadStatus {
    /**
     * Additional number of map slots that can be requested before
     * declaring (by Gridmix STRESS mode) the cluster as overloaded. 
     */
    private volatile int mapSlotsBackfill;
    
    /**
     * Determines the total map slot capacity of the cluster.
     */
    private volatile int mapSlotCapacity;
    
    /**
     * Additional number of reduce slots that can be requested before
     * declaring (by Gridmix STRESS mode) the cluster as overloaded.
     */
    private volatile int reduceSlotsBackfill;
    
    /**
     * Determines the total reduce slot capacity of the cluster.
     */
    private volatile int reduceSlotCapacity;

    /**
     * Determines the max count of running jobs in the cluster.
     */
    private volatile int numJobsBackfill;
    
    // set the default to true
    private AtomicBoolean overloaded = new AtomicBoolean(true);

    /**
     * Construct the LoadStatus in an unknown state - assuming the cluster is
     * overloaded by setting numSlotsBackfill=0.
     */
    LoadStatus() {
      mapSlotsBackfill = 0;
      reduceSlotsBackfill = 0;
      numJobsBackfill = 0;
      
      mapSlotCapacity = -1;
      reduceSlotCapacity = -1;
    }
    
    public synchronized int getMapLoad() {
      return mapSlotsBackfill;
    }
    
    public synchronized int getMapCapacity() {
      return mapSlotCapacity;
    }
    
    public synchronized int getReduceLoad() {
      return reduceSlotsBackfill;
    }
    
    public synchronized int getReduceCapacity() {
      return reduceSlotCapacity;
    }
    
    public synchronized int getJobLoad() {
      return numJobsBackfill;
    }
    
    public synchronized void decrementMapLoad(int mapSlotsConsumed) {
      this.mapSlotsBackfill -= mapSlotsConsumed;
      updateOverloadStatus();
    }
    
    public synchronized void decrementReduceLoad(int reduceSlotsConsumed) {
      this.reduceSlotsBackfill -= reduceSlotsConsumed;
      updateOverloadStatus();
    }

    public synchronized void decrementJobLoad(int numJobsConsumed) {
      this.numJobsBackfill -= numJobsConsumed;
      updateOverloadStatus();
    }
    
    public synchronized void updateMapCapacity(int mapSlotsCapacity) {
      this.mapSlotCapacity = mapSlotsCapacity;
      updateOverloadStatus();
    }
    
    public synchronized void updateReduceCapacity(int reduceSlotsCapacity) {
      this.reduceSlotCapacity = reduceSlotsCapacity;
      updateOverloadStatus();
    }
    
    public synchronized void updateMapLoad(int mapSlotsBackfill) {
      this.mapSlotsBackfill = mapSlotsBackfill;
      updateOverloadStatus();
    }
    
    public synchronized void updateReduceLoad(int reduceSlotsBackfill) {
      this.reduceSlotsBackfill = reduceSlotsBackfill;
      updateOverloadStatus();
    }
    
    public synchronized void updateJobLoad(int numJobsBackfill) {
      this.numJobsBackfill = numJobsBackfill;
      updateOverloadStatus();
    }
    
    private synchronized void updateOverloadStatus() {
      overloaded.set((mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
                     || (numJobsBackfill <= 0));
    }
    
    public boolean overloaded() {
      return overloaded.get();
    }
    
    public synchronized String toString() {
    // TODO Use StringBuilder instead
      return " Overloaded = " + overloaded()
             + ", MapSlotBackfill = " + mapSlotsBackfill 
             + ", MapSlotCapacity = " + mapSlotCapacity
             + ", ReduceSlotBackfill = " + reduceSlotsBackfill 
             + ", ReduceSlotCapacity = " + reduceSlotCapacity
             + ", NumJobsBackfill = " + numJobsBackfill;
    }
  }

  /**
   * Start the reader thread, wait for latch if necessary.
   */
  @Override
  public void start() {
    LOG.info(" Starting Stress submission ");
    this.rThread.start();
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AvgRecordFactory 源码

hadoop ClusterSummarizer 源码

hadoop CompressionEmulationUtil 源码

hadoop DistributedCacheEmulator 源码

hadoop EchoUserResolver 源码

hadoop ExecutionSummarizer 源码

hadoop FilePool 源码

hadoop FileQueue 源码

hadoop GenerateData 源码

hadoop GenerateDistCacheData 源码

0  赞