hadoop StageAllocatorGreedyRLE 源码

  • 2022-10-20
  • 浏览 (210)

haddop StageAllocatorGreedyRLE 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Computes the stage allocation according to the greedy allocation rule. The
 * greedy rule repeatedly allocates requested containers at the leftmost or
 * rightmost possible interval. This implementation leverages the
 * run-length-encoding of the time-series we operate on and proceed more quickly
 * than the baseline.
 */

public class StageAllocatorGreedyRLE implements StageAllocator {

  private final boolean allocateLeft;

  public StageAllocatorGreedyRLE(boolean allocateLeft) {
    this.allocateLeft = allocateLeft;
  }

  @Override
  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
      RLESparseResourceAllocation planLoads,
      RLESparseResourceAllocation planModifications, ReservationRequest rr,
      long stageEarliestStart, long stageDeadline, long period, String user,
      ReservationId oldId) throws PlanningException {

    // abort early if the interval is not satisfiable
    if (stageEarliestStart + rr.getDuration() > stageDeadline) {
      return null;
    }

    Map<ReservationInterval, Resource> allocationRequests =
        new HashMap<ReservationInterval, Resource>();

    Resource totalCapacity = plan.getTotalCapacity();

    // compute the gang as a resource and get the duration
    Resource sizeOfGang =
        Resources.multiply(rr.getCapability(), rr.getConcurrency());
    long dur = rr.getDuration();
    long step = plan.getStep();

    // ceil the duration to the next multiple of the plan step
    if (dur % step != 0) {
      dur += (step - (dur % step));
    }

    // we know for sure that this division has no remainder (part of contract
    // with user, validate before
    int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();

    // get available resources from plan
    RLESparseResourceAllocation netRLERes =
        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
            stageDeadline, period);

    // remove plan modifications
    netRLERes =
        RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
            totalCapacity, netRLERes, planModifications, RLEOperator.subtract,
            stageEarliestStart, stageDeadline);

    // loop trying to place until we are done, or we are considering
    // an invalid range of times
    while (gangsToPlace > 0 && stageEarliestStart + dur <= stageDeadline) {

      // as we run along we remember how many gangs we can fit, and what
      // was the most constraining moment in time (we will restart just
      // after that to place the next batch)
      int maxGang = gangsToPlace;
      long minPoint = -1;

      // focus our attention to a time-range under consideration
      NavigableMap<Long, Resource> partialMap =
          netRLERes.getRangeOverlapping(stageEarliestStart, stageDeadline)
              .getCumulative();

      // revert the map for right-to-left allocation
      if (!allocateLeft) {
        partialMap = partialMap.descendingMap();
      }

      Iterator<Entry<Long, Resource>> netIt = partialMap.entrySet().iterator();

      long oldT = stageDeadline;

      // internal loop, tries to allocate as many gang as possible starting
      // at a given point in time, if it fails we move to the next time
      // interval (with outside loop)
      while (maxGang > 0 && netIt.hasNext()) {

        long t;
        Resource curAvailRes;

        Entry<Long, Resource> e = netIt.next();
        if (allocateLeft) {
          t = Math.max(e.getKey(), stageEarliestStart);
          curAvailRes = e.getValue();
        } else {
          t = oldT;
          oldT = e.getKey();
          //attention: higher means lower, because we reversed the map direction
          curAvailRes = partialMap.higherEntry(t).getValue();
        }

        // check exit/skip conditions/
        if (curAvailRes == null) {
          //skip undefined regions (should not happen beside borders)
          continue;
        }
        if (exitCondition(t, stageEarliestStart, stageDeadline, dur)) {
          break;
        }

        // compute maximum number of gangs we could fit
        int curMaxGang =
            (int) Math.floor(Resources.divide(plan.getResourceCalculator(),
                totalCapacity, curAvailRes, sizeOfGang));
        curMaxGang = Math.min(gangsToPlace, curMaxGang);

        // compare with previous max, and set it. also remember *where* we found
        // the minimum (useful for next attempts)
        if (curMaxGang <= maxGang) {
          maxGang = curMaxGang;
          minPoint = t;
        }
      }

      // update data structures that retain the progress made so far
      gangsToPlace =
          trackProgress(planModifications, rr, stageEarliestStart,
              stageDeadline, allocationRequests, dur, gangsToPlace, maxGang);

      // reset the next range of time-intervals to deal with
      if (allocateLeft) {
        // set earliest start to the min of the constraining "range" or my the
        // end of this allocation
        if(partialMap.higherKey(minPoint) == null){
          stageEarliestStart = stageEarliestStart + dur;
        } else {
          stageEarliestStart =
             Math.min(partialMap.higherKey(minPoint), stageEarliestStart + dur);
        }
      } else {
        // same as above moving right-to-left
        if(partialMap.higherKey(minPoint) == null){
          stageDeadline = stageDeadline - dur;
        } else {
          stageDeadline =
              Math.max(partialMap.higherKey(minPoint), stageDeadline - dur);
        }
      }
    }

    // if no gangs are left to place we succeed and return the allocation
    if (gangsToPlace == 0) {
      return allocationRequests;
    } else {
      // If we are here is because we did not manage to satisfy this request.
      // So we need to remove unwanted side-effect from tempAssigned (needed
      // for ANY).
      for (Map.Entry<ReservationInterval, Resource> tempAllocation :
          allocationRequests.entrySet()) {
        planModifications.removeInterval(tempAllocation.getKey(),
            tempAllocation.getValue());
      }
      // and return null to signal failure in this allocation
      return null;
    }

  }

  private int trackProgress(RLESparseResourceAllocation planModifications,
      ReservationRequest rr, long stageEarliestStart, long stageDeadline,
      Map<ReservationInterval, Resource> allocationRequests, long dur,
      int gangsToPlace, int maxGang) {
    // if we were able to place any gang, record this, and decrement
    // gangsToPlace
    if (maxGang > 0) {
      gangsToPlace -= maxGang;

      ReservationInterval reservationInt =
          computeReservationInterval(stageEarliestStart, stageDeadline, dur);
      Resource reservationRes =
          Resources.multiply(rr.getCapability(), rr.getConcurrency() * maxGang);
      // remember occupied space (plan is read-only till we find a plausible
      // allocation for the entire request). This is needed since we might be
      // placing other ReservationRequest within the same
      // ReservationDefinition,
      // and we must avoid double-counting the available resources
      planModifications.addInterval(reservationInt, reservationRes);
      allocationRequests.put(reservationInt, reservationRes);

    }
    return gangsToPlace;
  }

  private ReservationInterval computeReservationInterval(
      long stageEarliestStart, long stageDeadline, long dur) {
    ReservationInterval reservationInt;
    if (allocateLeft) {
      reservationInt =
          new ReservationInterval(stageEarliestStart, stageEarliestStart + dur);
    } else {
      reservationInt =
          new ReservationInterval(stageDeadline - dur, stageDeadline);
    }
    return reservationInt;
  }


  private boolean exitCondition(long t, long stageEarliestStart,
      long stageDeadline, long dur) {
    if (allocateLeft) {
      return t >= stageEarliestStart + dur;
    } else {
      return t < stageDeadline - dur;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlignedPlannerWithGreedy 源码

hadoop GreedyReservationAgent 源码

hadoop IterativePlanner 源码

hadoop Planner 源码

hadoop PlanningAlgorithm 源码

hadoop ReservationAgent 源码

hadoop SimpleCapacityReplanner 源码

hadoop StageAllocator 源码

hadoop StageAllocatorGreedy 源码

hadoop StageAllocatorLowCostAligned 源码

0  赞