hadoop StageAllocatorGreedy 源码

  • 2022-10-20
  • 浏览 (225)

haddop StageAllocatorGreedy 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Computes the stage allocation according to the greedy allocation rule. The
 * greedy rule repeatedly allocates requested containers at the rightmost
 * (latest) free interval.
 */

public class StageAllocatorGreedy implements StageAllocator {

  @Override
  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, ReservationRequest rr,
      long stageEarliestStart, long stageDeadline, long period, String user,
      ReservationId oldId) throws PlanningException {

    Resource totalCapacity = plan.getTotalCapacity();

    Map<ReservationInterval, Resource> allocationRequests =
        new HashMap<ReservationInterval, Resource>();

    // compute the gang as a resource and get the duration
    Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
    long dur = rr.getDuration();
    long step = plan.getStep();

    // ceil the duration to the next multiple of the plan step
    if (dur % step != 0) {
      dur += (step - (dur % step));
    }

    // we know for sure that this division has no remainder (part of contract
    // with user, validate before
    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();

    int maxGang = 0;

    RLESparseResourceAllocation netAvailable =
        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
            stageDeadline, period);

    netAvailable =
        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
            plan.getTotalCapacity(), netAvailable, planModifications,
            RLEOperator.subtract, stageEarliestStart, stageDeadline);

    // loop trying to place until we are done, or we are considering
    // an invalid range of times
    while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {

      // as we run along we remember how many gangs we can fit, and what
      // was the most constraining moment in time (we will restart just
      // after that to place the next batch)
      maxGang = gangsToPlace;
      long minPoint = stageDeadline;
      int curMaxGang = maxGang;

      // start placing at deadline (excluded due to [,) interval semantics and
      // move backward
      for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
          && maxGang > 0; t = t - plan.getStep()) {

        Resource netAvailableRes = netAvailable.getCapacityAtTime(t);

        // compute maximum number of gangs we could fit
        curMaxGang =
            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
                totalCapacity, netAvailableRes, gang));

        // pick the minimum between available resources in this instant, and how
        // many gangs we have to place
        curMaxGang = Math.min(gangsToPlace, curMaxGang);

        // compare with previous max, and set it. also remember *where* we found
        // the minimum (useful for next attempts)
        if (curMaxGang <= maxGang) {
          maxGang = curMaxGang;
          minPoint = t;
        }
      }

      // if we were able to place any gang, record this, and decrement
      // gangsToPlace
      if (maxGang > 0) {
        gangsToPlace -= maxGang;

        ReservationInterval reservationInt =
            new ReservationInterval(stageDeadline - dur, stageDeadline);
        Resource reservationRes =
            Resources.multiply(rr.getCapability(), rr.getConcurrency()
                * maxGang);
        // remember occupied space (plan is read-only till we find a plausible
        // allocation for the entire request). This is needed since we might be
        // placing other ReservationRequest within the same
        // ReservationDefinition,
        // and we must avoid double-counting the available resources
        planModifications.addInterval(reservationInt, reservationRes);
        allocationRequests.put(reservationInt, reservationRes);

      }

      // reset our new starting point (curDeadline) to the most constraining
      // point so far, we will look "left" of that to find more places where
      // to schedule gangs (for sure nothing on the "right" of this point can
      // fit a full gang.
      stageDeadline = minPoint;
    }

    // if no gangs are left to place we succeed and return the allocation
    if (gangsToPlace == 0) {
      return allocationRequests;
    } else {
      // If we are here is becasue we did not manage to satisfy this request.
      // So we need to remove unwanted side-effect from tempAssigned (needed
      // for ANY).
      for (Map.Entry<ReservationInterval, Resource> tempAllocation
          : allocationRequests.entrySet()) {
        planModifications.removeInterval(tempAllocation.getKey(),
            tempAllocation.getValue());
      }
      // and return null to signal failure in this allocation
      return null;
    }

  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AlignedPlannerWithGreedy 源码

hadoop GreedyReservationAgent 源码

hadoop IterativePlanner 源码

hadoop Planner 源码

hadoop PlanningAlgorithm 源码

hadoop ReservationAgent 源码

hadoop SimpleCapacityReplanner 源码

hadoop StageAllocator 源码

hadoop StageAllocatorGreedyRLE 源码

hadoop StageAllocatorLowCostAligned 源码

0  赞