hadoop LpSolver 源码

  • 2022-10-20
  • 浏览 (187)

haddop LpSolver 代码

文件路径:/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/LpSolver.java

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.hadoop.resourceestimator.solver.impl;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
import org.apache.hadoop.resourceestimator.skylinestore.api.PredictionSkylineStore;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
import org.apache.hadoop.resourceestimator.solver.api.Solver;
import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
import org.apache.hadoop.resourceestimator.solver.preprocess.SolverPreprocessor;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.ojalgo.optimisation.Expression;
import org.ojalgo.optimisation.ExpressionsBasedModel;
import org.ojalgo.optimisation.Optimisation.Result;
import org.ojalgo.optimisation.Variable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A LP(Linear Programming) solution to predict recurring pipeline's
 * {@link Resource} requirements, and generate Hadoop {@code RDL} requests which
 * will be used to make recurring resource reservation.
 */
public class LpSolver extends BaseSolver implements Solver {
  private static final Logger LOGGER = LoggerFactory.getLogger(LpSolver.class);
  private final SolverPreprocessor preprocessor = new SolverPreprocessor();
  /**
   * Controls the balance between over-allocation and under-allocation.
   */
  private double alpha;
  /**
   * Controls the generalization of the solver.
   */
  private double beta;
  /**
   * The minimum number of job runs required to run the solver.
   */
  private int minJobRuns;
  /**
   * The time interval which is used to discretize job execution.
   */
  private int timeInterval;
  /**
   * The PredictionSkylineStore to store the predicted ResourceSkyline for new
   * run.
   */
  private PredictionSkylineStore predictionSkylineStore;

  @Override public final void init(final Configuration config,
      PredictionSkylineStore skylineStore) {
    this.alpha =
        config.getDouble(ResourceEstimatorConfiguration.SOLVER_ALPHA_KEY, 0.1);
    this.beta =
        config.getDouble(ResourceEstimatorConfiguration.SOLVER_BETA_KEY, 0.1);
    this.minJobRuns =
        config.getInt(ResourceEstimatorConfiguration.SOLVER_MIN_JOB_RUN_KEY, 1);
    this.timeInterval =
        config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5);
    this.predictionSkylineStore = skylineStore;
  }

  /**
   * Generate over-allocation constraints.
   *
   * @param lpModel            the LP model.
   * @param cJobITimeK         actual container allocation for job i in time
   *                           interval k.
   * @param oa                 container over-allocation.
   * @param x                  predicted container allocation.
   * @param indexJobITimeK     index for job i at time interval k.
   * @param timeK              index for time interval k.
   */
  private void generateOverAllocationConstraints(
      final ExpressionsBasedModel lpModel, final double cJobITimeK,
      final Variable[] oa, final Variable[] x, final int indexJobITimeK,
      final int timeK) {
    // oa_job_i_timeK >= x_timeK - cJobITimeK
    Expression overAllocExpression =
        lpModel.addExpression("over_alloc_" + indexJobITimeK);
    overAllocExpression.set(oa[indexJobITimeK], 1);
    overAllocExpression.set(x[timeK], -1);
    overAllocExpression.lower(-cJobITimeK); // >=
  }

  /**
   * Generate under-allocation constraints.
   *
   * @param lpModel            the LP model.
   * @param cJobITimeK     actual container allocation for job i in time
   *                           interval k.
   * @param uaPredict          absolute container under-allocation.
   * @param ua                 recursive container under-allocation.
   * @param x                  predicted container allocation.
   * @param indexJobITimeK index for job i at time interval k.
   * @param timeK             index for time interval k.
   */
  private void generateUnderAllocationConstraints(
      final ExpressionsBasedModel lpModel, final double cJobITimeK,
      final Variable[] uaPredict, final Variable[] ua, final Variable[] x,
      final int indexJobITimeK, final int timeK) {
    // uaPredict_job_i_timeK + x_timeK >= cJobITimeK
    Expression underAllocPredictExpression =
        lpModel.addExpression("under_alloc_predict_" + indexJobITimeK);
    underAllocPredictExpression.set(uaPredict[indexJobITimeK], 1);
    underAllocPredictExpression.set(x[timeK], 1);
    underAllocPredictExpression.lower(cJobITimeK); // >=
    if (timeK >= 1) {
      /** Recursively calculate container under-allocation. */
      // ua_job_i_timeK >= ua_job_i_time_(k-1) + cJobITimeK - x_timeK
      Expression underAllocExpression =
          lpModel.addExpression("under_alloc_" + indexJobITimeK);
      underAllocExpression.set(ua[indexJobITimeK], 1);
      underAllocExpression.set(ua[indexJobITimeK - 1], -1);
      underAllocExpression.set(x[timeK], 1);
      underAllocExpression.lower(cJobITimeK); // >=
    } else {
      /** Initial value for container under-allocation. */
      // ua_job_i_time_0 >= cJobI_time_0 - x_time_0
      Expression underAllocExpression =
          lpModel.addExpression("under_alloc_" + indexJobITimeK);
      underAllocExpression.set(ua[indexJobITimeK], 1);
      underAllocExpression.set(x[timeK], 1);
      underAllocExpression.lower(cJobITimeK); // >=
    }
  }

  /**
   * Generate solver objective.
   *
   * @param objective LP solver objective.
   * @param numJobs   number of history runs of the recurring pipeline.
   * @param jobLen    (maximum) job lenght of the recurring pipeline.
   * @param oa        container over-allocation.
   * @param ua        recursive container under-allocation.
   * @param eps       regularization parameter.
   */
  private void generateObjective(final Expression objective, final int numJobs,
      final int jobLen, final Variable[] oa, final Variable[] ua,
      final Variable eps) {
    int indexJobITimeK;
    // sum Over_Allocation
    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
      for (int timeK = 0; timeK < jobLen; timeK++) {
        indexJobITimeK = indexJobI * jobLen + timeK;
        objective.set(oa[indexJobITimeK], alpha / numJobs);
      }
    }
    // sum Under_Allocation
    int indexJobITimeN;
    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
      indexJobITimeN = indexJobI * jobLen + jobLen - 1;
      objective.set(ua[indexJobITimeN], (1 - alpha) / numJobs);
    }
    objective.set(eps, beta);
    objective.weight(BigDecimal.valueOf(1));
  }

  /**
   * Get the job length of recurring pipeline.
   *
   * @param resourceSkylines the history ResourceSkylines allocated to the
   *                         recurring pipeline.
   * @param numJobs          number of history runs of the recurring pipeline.
   * @return length of (discretized time intervals of) the recurring pipeline.
   */
  private int getJobLen(final List<ResourceSkyline> resourceSkylines,
      final int numJobs) {
    int curLen = 0;
    int jobLen = 0;
    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
      curLen = (int) (resourceSkylines.get(indexJobI).getSkylineList()
          .getLatestNonNullTime() - resourceSkylines.get(indexJobI)
          .getSkylineList().getEarliestStartTime() + timeInterval - 1)
          / timeInterval; // for round up
      if (jobLen < curLen) {
        jobLen = curLen;
      }
    }
    return jobLen;
  }

  @Override public final RLESparseResourceAllocation solve(
      final Map<RecurrenceId, List<ResourceSkyline>> jobHistory)
      throws SolverException, SkylineStoreException {
    // TODO: addHistory timeout support for this function, and ideally we should
    // return the confidence
    // level associated with the predicted resource.
    preprocessor.validate(jobHistory, timeInterval);
    final List<ResourceSkyline> resourceSkylines =
        preprocessor.aggregateSkylines(jobHistory, minJobRuns);
    final int numJobs = resourceSkylines.size();
    final int jobLen = getJobLen(resourceSkylines, numJobs);

    /** Create variables. */
    final ExpressionsBasedModel lpModel = new ExpressionsBasedModel();

    Variable[] oa = new Variable[jobLen * numJobs];
    Variable[] ua = new Variable[jobLen * numJobs];
    Variable[] uaPredict = new Variable[jobLen * numJobs];
    Variable[] x = new Variable[jobLen];
    for (int i = 0; i < jobLen * numJobs; i++) {
      oa[i] = new Variable("oa" + i).lower(BigDecimal.valueOf(0));
      ua[i] = new Variable("ua" + i).lower(BigDecimal.valueOf(0));
      uaPredict[i] = new Variable("uaPredict" + i).lower(BigDecimal.valueOf(0));
    }
    for (int i = 0; i < jobLen; i++) {
      x[i] = new Variable("x").lower(BigDecimal.valueOf(0));
    }
    lpModel.addVariables(x);
    lpModel.addVariables(oa);
    lpModel.addVariables(ua);
    lpModel.addVariables(uaPredict);
    Variable eps = new Variable("epsilon").lower(BigDecimal.valueOf(0));
    lpModel.addVariable(eps);

    /** Set constraints. */
    int indexJobITimeK = 0;
    double cJobI = 0;
    double cJobITimeK = 0;
    ResourceSkyline resourceSkyline;
    int[] containerNums;
    // 1. sum(job_i){sum(timeK){1/cJobI * uaPredict_job_i_timeK}} <= numJobs
    // * eps
    Expression regularizationConstraint =
        lpModel.addExpression("regularization");
    regularizationConstraint.set(eps, -numJobs);
    regularizationConstraint.upper(BigDecimal.valueOf(0)); // <= 0
    for (int indexJobI = 0;
         indexJobI < resourceSkylines.size(); indexJobI++) {
      resourceSkyline = resourceSkylines.get(indexJobI);
      // the # of containers consumed by job i in discretized time intervals
      containerNums = preprocessor
          .getDiscreteSkyline(resourceSkyline.getSkylineList(), timeInterval,
              resourceSkyline.getContainerSpec().getMemorySize(), jobLen);
      // the aggregated # of containers consumed by job i during its lifespan
      cJobI = 0;
      for (int i = 0; i < containerNums.length; i++) {
        cJobI = cJobI + containerNums[i];
      }
      for (int timeK = 0; timeK < jobLen; timeK++) {
        indexJobITimeK = indexJobI * jobLen + timeK;
        // the # of containers consumed by job i in the k-th time interval
        cJobITimeK = containerNums[timeK];
        regularizationConstraint
            .set(uaPredict[indexJobITimeK], 1 / cJobI);
        generateOverAllocationConstraints(lpModel, cJobITimeK, oa, x,
            indexJobITimeK, timeK);
        generateUnderAllocationConstraints(lpModel, cJobITimeK, uaPredict,
            ua, x, indexJobITimeK, timeK);
      }
    }

    /** Set objective. */
    Expression objective = lpModel.addExpression("objective");
    generateObjective(objective, numJobs, jobLen, oa, ua, eps);

    /** Solve the model. */
    final Result lpResult = lpModel.minimise();
    final TreeMap<Long, Resource> treeMap = new TreeMap<>();
    RLESparseResourceAllocation result =
        new RLESparseResourceAllocation(treeMap,
            new DefaultResourceCalculator());
    ReservationInterval riAdd;
    Resource containerSpec = resourceSkylines.get(0).getContainerSpec();
    String pipelineId =
        ((RecurrenceId) jobHistory.keySet().toArray()[0]).getPipelineId();
    Resource resource;
    for (int indexTimeK = 0; indexTimeK < jobLen; indexTimeK++) {
      riAdd = new ReservationInterval(indexTimeK * timeInterval,
          (indexTimeK + 1) * timeInterval);
      resource = Resource.newInstance(
          containerSpec.getMemorySize() * (int) lpResult
              .doubleValue(indexTimeK),
          containerSpec.getVirtualCores() * (int) lpResult
              .doubleValue(indexTimeK));
      result.addInterval(riAdd, resource);
      LOGGER.debug("time interval: {}, container: {}.", indexTimeK,
          lpResult.doubleValue(indexTimeK));
    }

    predictionSkylineStore.addEstimation(pipelineId, result);

    /**
     * TODO: 1. We can calculate the estimated error (over-allocation,
     * under-allocation) of our prediction which could be used to generate
     * confidence level for our prediction; 2. Also, we can modify our model to
     * take job input data size (and maybe stage info) into consideration; 3. We
     * can also try to generate such conclusion: our prediction under-allocates
     * X amount of resources from time 0 to time 100 compared with 95% of
     * history runs; 4. We can build framework-specific versions of estimator
     * (such as scope/spark/hive, etc.) and provides more specific suggestions.
     * For example, we may say: for spark job i, its task size is X GB while the
     * container memory allocation is Y GB; as a result, its shuffling stage is
     * 20% slower than ideal case due to the disk spilling operations, etc. 5.
     * If we have more information of jobs (other than ResourceSkyline), we may
     * have such conclusion: job i is 20% slower than 90% of history runs, and
     * it is because part of its tasks are running together with job j's tasks.
     * In this case, we not only predict the amount of resource needed for job
     * i, but also how to place the resource requirements to clusters; 6. We may
     * monitor job progress, and dynamically increase/decrease container
     * allocations to satisfy job deadline while minimizing the cost; 7. We may
     * allow users to specify a budget (say $100 per job run), and optimize the
     * resource allocation under the budget constraints. 8. ...
     */
    return result;
  }

  @Override public final void close() {
    // TODO: currently place holder
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BaseSolver 源码

hadoop package-info 源码

0  赞