hadoop InMemoryPlan 源码

  • 2022-10-20
  • 浏览 (193)

haddop InMemoryPlan 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class represents an in memory representation of the state of our
 * reservation system, and provides accelerated access to both individual
 * reservations and aggregate utilization of resources over time.
 */
public class InMemoryPlan implements Plan {

  private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);

  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
  private final RMStateStore rmStateStore;

  private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
      new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();

  private RLESparseResourceAllocation rleSparseVector;

  private PeriodicRLESparseResourceAllocation periodicRle;

  private Map<String, RLESparseResourceAllocation> userResourceAlloc =
      new HashMap<String, RLESparseResourceAllocation>();

  private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc =
      new HashMap<String, RLESparseResourceAllocation>();

  private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
      new HashMap<String, RLESparseResourceAllocation>();

  private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
      new HashMap<ReservationId, InMemoryReservationAllocation>();

  private final ReentrantReadWriteLock readWriteLock =
      new ReentrantReadWriteLock();
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();
  private final SharingPolicy policy;
  private final ReservationAgent agent;
  private final long step;
  private final ResourceCalculator resCalc;
  private final Resource minAlloc, maxAlloc;
  private final String queueName;
  private final QueueMetrics queueMetrics;
  private final Planner replanner;
  private final boolean getMoveOnExpiry;
  private final Clock clock;
  private final long maxPeriodicity;

  private Resource totalCapacity;

  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
      ReservationAgent agent, Resource totalCapacity, long step,
      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
      String queueName, Planner replanner, boolean getMoveOnExpiry,
      RMContext rmContext) {
    this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
        maxAlloc, queueName, replanner, getMoveOnExpiry,
        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
        rmContext);
  }

  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
      ReservationAgent agent, Resource totalCapacity, long step,
      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
      String queueName, Planner replanner, boolean getMoveOnExpiry,
      long maxPeriodicity, RMContext rmContext) {
    this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
        maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicity,
        rmContext, new UTCClock());
  }

  @SuppressWarnings("checkstyle:parameternumber")
  public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
      ReservationAgent agent, Resource totalCapacity, long step,
      ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
      String queueName, Planner replanner, boolean getMoveOnExpiry,
      long maxPeriodicty, RMContext rmContext, Clock clock) {
    this.queueMetrics = queueMetrics;
    this.policy = policy;
    this.agent = agent;
    this.step = step;
    this.totalCapacity = totalCapacity;
    this.resCalc = resCalc;
    this.minAlloc = minAlloc;
    this.maxAlloc = maxAlloc;
    this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
    this.maxPeriodicity = maxPeriodicty;
    this.periodicRle =
        new PeriodicRLESparseResourceAllocation(resCalc, this.maxPeriodicity);
    this.queueName = queueName;
    this.replanner = replanner;
    this.getMoveOnExpiry = getMoveOnExpiry;
    this.clock = clock;
    this.rmStateStore = rmContext.getStateStore();
  }

  @Override
  public QueueMetrics getQueueMetrics() {
    return queueMetrics;
  }

  private RLESparseResourceAllocation getUserRLEResourceAllocation(String user,
      long period) {
    RLESparseResourceAllocation resAlloc = null;
    if (period > 0) {
      if (userPeriodicResourceAlloc.containsKey(user)) {
        resAlloc = userPeriodicResourceAlloc.get(user);
      } else {
        resAlloc = new PeriodicRLESparseResourceAllocation(resCalc,
            periodicRle.getTimePeriod());
        userPeriodicResourceAlloc.put(user, resAlloc);
      }
    } else {
      if (userResourceAlloc.containsKey(user)) {
        resAlloc = userResourceAlloc.get(user);
      } else {
        resAlloc = new RLESparseResourceAllocation(resCalc);
        userResourceAlloc.put(user, resAlloc);
      }
    }
    return resAlloc;
  }

  private void gcUserRLEResourceAllocation(String user, long period) {
    if (period > 0) {
      if (userPeriodicResourceAlloc.get(user).isEmpty()) {
        userPeriodicResourceAlloc.remove(user);
      }
    } else {
      if (userResourceAlloc.get(user).isEmpty()) {
        userResourceAlloc.remove(user);
      }
    }
  }

  private void incrementAllocation(ReservationAllocation reservation) {
    assert (readWriteLock.isWriteLockedByCurrentThread());
    Map<ReservationInterval, Resource> allocationRequests =
        reservation.getAllocationRequests();
    // check if we have encountered the user earlier and if not add an entry
    String user = reservation.getUser();
    long period = reservation.getPeriodicity();
    RLESparseResourceAllocation resAlloc =
        getUserRLEResourceAllocation(user, period);

    RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
    if (resCount == null) {
      resCount = new RLESparseResourceAllocation(resCalc);
      userActiveReservationCount.put(user, resCount);
    }

    long earliestActive = Long.MAX_VALUE;
    long latestActive = Long.MIN_VALUE;

    for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
        .entrySet()) {

      if (period > 0L) {
        for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {

          long rStart = r.getKey().getStartTime() + i * period;
          long rEnd = r.getKey().getEndTime() + i * period;

          // handle wrap-around
          if (rEnd > periodicRle.getTimePeriod()) {
            long diff = rEnd - periodicRle.getTimePeriod();
            rEnd = periodicRle.getTimePeriod();
            ReservationInterval newInterval = new ReservationInterval(0, diff);
            periodicRle.addInterval(newInterval, r.getValue());
            resAlloc.addInterval(newInterval, r.getValue());
          }

          ReservationInterval newInterval =
              new ReservationInterval(rStart, rEnd);
          periodicRle.addInterval(newInterval, r.getValue());
          resAlloc.addInterval(newInterval, r.getValue());
        }

      } else {
        rleSparseVector.addInterval(r.getKey(), r.getValue());
        resAlloc.addInterval(r.getKey(), r.getValue());
        if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
            ZERO_RESOURCE)) {
          earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
          latestActive = Math.max(latestActive, r.getKey().getEndTime());
        }
      }
    }
    // periodic reservations are active from start time and good till cancelled
    if (period > 0L) {
      earliestActive = reservation.getStartTime();
      latestActive = Long.MAX_VALUE;
    }
    resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
        Resource.newInstance(1, 1));
  }

  private void decrementAllocation(ReservationAllocation reservation) {
    assert (readWriteLock.isWriteLockedByCurrentThread());
    Map<ReservationInterval, Resource> allocationRequests =
        reservation.getAllocationRequests();
    String user = reservation.getUser();
    long period = reservation.getPeriodicity();
    RLESparseResourceAllocation resAlloc =
        getUserRLEResourceAllocation(user, period);

    long earliestActive = Long.MAX_VALUE;
    long latestActive = Long.MIN_VALUE;
    for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
        .entrySet()) {
      if (period > 0L) {
        for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {

          long rStart = r.getKey().getStartTime() + i * period;
          long rEnd = r.getKey().getEndTime() + i * period;

          // handle wrap-around
          if (rEnd > periodicRle.getTimePeriod()) {
            long diff = rEnd - periodicRle.getTimePeriod();
            rEnd = periodicRle.getTimePeriod();
            ReservationInterval newInterval = new ReservationInterval(0, diff);
            periodicRle.removeInterval(newInterval, r.getValue());
            resAlloc.removeInterval(newInterval, r.getValue());
          }

          ReservationInterval newInterval =
              new ReservationInterval(rStart, rEnd);
          periodicRle.removeInterval(newInterval, r.getValue());
          resAlloc.removeInterval(newInterval, r.getValue());
        }
      } else {
        rleSparseVector.removeInterval(r.getKey(), r.getValue());
        resAlloc.removeInterval(r.getKey(), r.getValue());
        if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
            ZERO_RESOURCE)) {
          earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
          latestActive = Math.max(latestActive, r.getKey().getEndTime());
        }
      }
    }
    gcUserRLEResourceAllocation(user, period);

    RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
    // periodic reservations are active from start time and good till cancelled
    if (period > 0L) {
      earliestActive = reservation.getStartTime();
      latestActive = Long.MAX_VALUE;
    }
    resCount.removeInterval(
        new ReservationInterval(earliestActive, latestActive),
        Resource.newInstance(1, 1));
    if (resCount.isEmpty()) {
      userActiveReservationCount.remove(user);
    }
  }

  public Set<ReservationAllocation> getAllReservations() {
    readLock.lock();
    try {
      if (currentReservations != null) {
        Set<ReservationAllocation> flattenedReservations =
            new TreeSet<ReservationAllocation>();
        for (Set<InMemoryReservationAllocation> res : currentReservations
            .values()) {
          flattenedReservations.addAll(res);
        }
        return flattenedReservations;
      } else {
        return null;
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public boolean addReservation(ReservationAllocation reservation,
      boolean isRecovering) throws PlanningException {
    // Verify the allocation is memory based otherwise it is not supported
    InMemoryReservationAllocation inMemReservation =
        (InMemoryReservationAllocation) reservation;
    if (inMemReservation.getUser() == null) {
      String errMsg = "The specified Reservation with ID "
          + inMemReservation.getReservationId() + " is not mapped to any user";
      LOG.error(errMsg);
      throw new IllegalArgumentException(errMsg);
    }
    writeLock.lock();
    try {
      if (reservationTable.containsKey(inMemReservation.getReservationId())) {
        String errMsg = "The specified Reservation with ID "
            + inMemReservation.getReservationId() + " already exists";
        LOG.error(errMsg);
        throw new IllegalArgumentException(errMsg);
      }
      // Validate if we can accept this reservation, throws exception if
      // validation fails
      if (!isRecovering) {
        policy.validate(this, inMemReservation);
        // we record here the time in which the allocation has been accepted
        reservation.setAcceptanceTimestamp(clock.getTime());
        if (rmStateStore != null) {
          rmStateStore.storeNewReservation(
              ReservationSystemUtil.buildStateProto(inMemReservation),
              getQueueName(), inMemReservation.getReservationId().toString());
        }
      }
      ReservationInterval searchInterval = new ReservationInterval(
          inMemReservation.getStartTime(), inMemReservation.getEndTime());
      Set<InMemoryReservationAllocation> reservations =
          currentReservations.get(searchInterval);
      if (reservations == null) {
        reservations = new HashSet<InMemoryReservationAllocation>();
      }
      if (!reservations.add(inMemReservation)) {
        LOG.error("Unable to add reservation: {} to plan.",
            inMemReservation.getReservationId());
        return false;
      }
      currentReservations.put(searchInterval, reservations);
      reservationTable.put(inMemReservation.getReservationId(),
          inMemReservation);
      incrementAllocation(inMemReservation);
      LOG.info("Successfully added reservation: {} to plan.",
          inMemReservation.getReservationId());
      return true;
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public boolean updateReservation(ReservationAllocation reservation)
      throws PlanningException {
    writeLock.lock();
    boolean result = false;
    try {
      ReservationId resId = reservation.getReservationId();
      ReservationAllocation currReservation = getReservationById(resId);
      if (currReservation == null) {
        String errMsg = "The specified Reservation with ID " + resId
            + " does not exist in the plan";
        LOG.error(errMsg);
        throw new IllegalArgumentException(errMsg);
      }
      // validate if we can accept this reservation, throws exception if
      // validation fails
      policy.validate(this, reservation);
      if (!removeReservation(currReservation)) {
        LOG.error("Unable to replace reservation: {} from plan.",
            reservation.getReservationId());
        return result;
      }
      try {
        result = addReservation(reservation, false);
      } catch (PlanningException e) {
        LOG.error("Unable to update reservation: {} from plan due to {}.",
            reservation.getReservationId(), e.getMessage());
      }
      if (result) {
        LOG.info("Successfully updated reservation: {} in plan.",
            reservation.getReservationId());
        return result;
      } else {
        // rollback delete
        addReservation(currReservation, false);
        LOG.info("Rollbacked update reservation: {} from plan.",
            reservation.getReservationId());
        return result;
      }
    } finally {
      writeLock.unlock();
    }
  }

  private boolean removeReservation(ReservationAllocation reservation) {
    assert (readWriteLock.isWriteLockedByCurrentThread());
    ReservationInterval searchInterval = new ReservationInterval(
        reservation.getStartTime(), reservation.getEndTime());
    Set<InMemoryReservationAllocation> reservations =
        currentReservations.get(searchInterval);
    if (reservations != null) {
      if (rmStateStore != null) {
        rmStateStore.removeReservation(getQueueName(),
            reservation.getReservationId().toString());
      }
      if (!reservations.remove(reservation)) {
        LOG.error("Unable to remove reservation: {} from plan.",
            reservation.getReservationId());
        return false;
      }
      if (reservations.isEmpty()) {
        currentReservations.remove(searchInterval);
      }
    } else {
      String errMsg = "The specified Reservation with ID "
          + reservation.getReservationId() + " does not exist in the plan";
      LOG.error(errMsg);
      throw new IllegalArgumentException(errMsg);
    }
    reservationTable.remove(reservation.getReservationId());
    decrementAllocation(reservation);
    LOG.info("Sucessfully deleted reservation: {} in plan.",
        reservation.getReservationId());
    return true;
  }

  @Override
  public boolean deleteReservation(ReservationId reservationID) {
    writeLock.lock();
    try {
      ReservationAllocation reservation = getReservationById(reservationID);
      if (reservation == null) {
        String errMsg = "The specified Reservation with ID " + reservationID
            + " does not exist in the plan";
        LOG.error(errMsg);
        throw new IllegalArgumentException(errMsg);
      }
      return removeReservation(reservation);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void archiveCompletedReservations(long tick) {
    // Since we are looking for old reservations, read lock is optimal
    LOG.debug("Running archival at time: {}", tick);
    List<InMemoryReservationAllocation> expiredReservations =
        new ArrayList<InMemoryReservationAllocation>();
    readLock.lock();
    // archive reservations and delete the ones which are beyond
    // the reservation policy "window"
    try {
      long archivalTime = tick - policy.getValidWindow();
      ReservationInterval searchInterval =
          new ReservationInterval(archivalTime, archivalTime);
      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
          currentReservations.headMap(searchInterval, true);
      if (!reservations.isEmpty()) {
        for (Set<InMemoryReservationAllocation> reservationEntries : reservations
            .values()) {
          for (InMemoryReservationAllocation reservation : reservationEntries) {
            if (reservation.getEndTime() <= archivalTime) {
              expiredReservations.add(reservation);
            }
          }
        }
      }
    } finally {
      readLock.unlock();
    }
    if (expiredReservations.isEmpty()) {
      return;
    }
    // Need write lock only if there are any reservations to be deleted
    writeLock.lock();
    try {
      for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
        removeReservation(expiredReservation);
      }
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public Set<ReservationAllocation> getReservationsAtTime(long tick) {
    return getReservations(null, new ReservationInterval(tick, tick), "");
  }

  @Override
  public long getStep() {
    return step;
  }

  @Override
  public SharingPolicy getSharingPolicy() {
    return policy;
  }

  @Override
  public ReservationAgent getReservationAgent() {
    return agent;
  }

  @Override
  public RLESparseResourceAllocation getReservationCountForUserOverTime(
      String user, long start, long end) {
    readLock.lock();
    try {
      RLESparseResourceAllocation userResAlloc =
          userActiveReservationCount.get(user);

      if (userResAlloc != null) {
        return userResAlloc.getRangeOverlapping(start, end);
      } else {
        return new RLESparseResourceAllocation(resCalc);
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
      long start, long end) {
    readLock.lock();
    try {
      // merge periodic and non-periodic allocations
      RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
      RLESparseResourceAllocation userPeriodicResAlloc =
          userPeriodicResourceAlloc.get(user);

      if (userResAlloc != null && userPeriodicResAlloc != null) {
        return RLESparseResourceAllocation.merge(resCalc, totalCapacity,
            userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end);
      }
      if (userResAlloc != null) {
        return userResAlloc.getRangeOverlapping(start, end);
      }
      if (userPeriodicResAlloc != null) {
        return userPeriodicResAlloc.getRangeOverlapping(start, end);
      }
    } catch (PlanningException e) {
      LOG.warn("Exception while trying to merge periodic"
          + " and non-periodic user allocations: {}", e.getMessage(), e);
    } finally {
      readLock.unlock();
    }
    return new RLESparseResourceAllocation(resCalc);
  }

  @Override
  public Resource getTotalCommittedResources(long t) {
    readLock.lock();
    try {
      return Resources.add(rleSparseVector.getCapacityAtTime(t),
          periodicRle.getCapacityAtTime(t));
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public Set<ReservationAllocation> getReservations(ReservationId reservationID,
      ReservationInterval interval) {
    return getReservations(reservationID, interval, null);
  }

  @Override
  public Set<ReservationAllocation> getReservations(ReservationId reservationID,
      ReservationInterval interval, String user) {
    if (reservationID != null) {
      ReservationAllocation allocation = getReservationById(reservationID);
      if (allocation == null) {
        return Collections.emptySet();
      }
      return Collections.singleton(allocation);
    }

    long startTime = interval == null ? 0 : interval.getStartTime();
    long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime();

    ReservationInterval searchInterval =
        new ReservationInterval(endTime, Long.MAX_VALUE);
    readLock.lock();
    try {
      SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res =
          currentReservations.headMap(searchInterval, true);
      if (!res.isEmpty()) {
        Set<ReservationAllocation> flattenedReservations = new HashSet<>();
        for (Set<InMemoryReservationAllocation> resEntries : res.values()) {
          for (InMemoryReservationAllocation reservation : resEntries) {
            // validate user
            if (user != null && !user.isEmpty()
                && !reservation.getUser().equals(user)) {
              continue;
            }
            // handle periodic reservations
            long period = reservation.getPeriodicity();
            if (period > 0) {
              // The shift is used to remove the wrap around for the
              // reservation interval. The wrap around will still
              // exist for the search interval.
              long shift = reservation.getStartTime() % period;
              // This is the duration of the reservation since
              // duration < period.
              long periodicReservationEnd =
                  (reservation.getEndTime() -shift) % period;
              long periodicSearchStart = (startTime - shift) % period;
              long periodicSearchEnd = (endTime - shift) % period;
              long searchDuration = endTime - startTime;

              // 1. If the searchDuration is greater than the period, then
              // the reservation is within the interval. This will allow
              // us to ignore cases where search end > search start >
              // reservation end.
              // 2/3. If the search end is less than the reservation end, or if
              // the search start is less than the reservation end, then the
              // reservation will be in the reservation since
              // periodic reservation start is always zero. Note that neither
              // of those values will ever be negative.
              // 4. If the search end is less than the search start, then
              // there is a wrap around, and both values are implicitly
              // greater than the reservation end because of condition 2/3,
              // so the reservation is within the search interval.
              if (searchDuration > period
                  || periodicSearchEnd < periodicReservationEnd
                  || periodicSearchStart < periodicReservationEnd
                  || periodicSearchStart > periodicSearchEnd) {
                flattenedReservations.add(reservation);
              }
            } else {
              // check for non-periodic reservations
              if (reservation.getEndTime() > startTime) {
                flattenedReservations.add(reservation);
              }
            }
          }
        }
        return Collections.unmodifiableSet(flattenedReservations);
      } else {
        return Collections.emptySet();
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public ReservationAllocation getReservationById(ReservationId reservationID) {
    if (reservationID == null) {
      return null;
    }
    readLock.lock();
    try {
      return reservationTable.get(reservationID);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public Resource getTotalCapacity() {
    readLock.lock();
    try {
      return Resources.clone(totalCapacity);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
      ReservationId oldId, long start, long end, long period)
      throws PlanningException {
    readLock.lock();
    try {

      // for non-periodic return simple available resources
      if (period == 0) {

        // create RLE of totCapacity
        TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
        totAvailable.put(start, Resources.clone(totalCapacity));
        RLESparseResourceAllocation totRLEAvail =
            new RLESparseResourceAllocation(totAvailable, resCalc);

        // subtract used from available
        RLESparseResourceAllocation netAvailable;

        netAvailable = RLESparseResourceAllocation.merge(resCalc,
            Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
            RLEOperator.subtractTestNonNegative, start, end);

        // remove periodic component
        netAvailable = RLESparseResourceAllocation.merge(resCalc,
            Resources.clone(totalCapacity), netAvailable, periodicRle,
            RLEOperator.subtractTestNonNegative, start, end);

        // add back in old reservation used resources if any
        ReservationAllocation old = reservationTable.get(oldId);
        if (old != null) {

          RLESparseResourceAllocation addBackPrevious =
              old.getResourcesOverTime(start, end);
          netAvailable = RLESparseResourceAllocation.merge(resCalc,
              Resources.clone(totalCapacity), netAvailable, addBackPrevious,
              RLEOperator.add, start, end);
        }
        // lower it if this is needed by the sharing policy
        netAvailable = getSharingPolicy().availableResources(netAvailable, this,
            user, oldId, start, end);
        return netAvailable;
      } else {

        if (periodicRle.getTimePeriod() % period != 0) {
          throw new PlanningException("The reservation periodicity (" + period
              + ") must be" + " an exact divider of the system maxPeriod ("
              + periodicRle.getTimePeriod() + ")");
        }

        if (period < (end - start)) {
          throw new PlanningException(
              "Invalid input: (end - start) = (" + end + " - " + start + ") = "
                  + (end - start) + " > period = " + period);
        }

        // find the minimum resources available among all the instances that fit
        // in the LCM
        long numInstInLCM = periodicRle.getTimePeriod() / period;

        RLESparseResourceAllocation minOverLCM =
            getAvailableResourceOverTime(user, oldId, start, end, 0);
        for (int i = 1; i < numInstInLCM; i++) {

          long rStart = start + i * period;
          long rEnd = end + i * period;

          // recursive invocation of non-periodic range (to pick raw-info)
          RLESparseResourceAllocation snapShot =
              getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0);

          // time-align on start
          snapShot.shift(-(i * period));

          // pick the minimum amount of resources in each time interval
          minOverLCM =
              RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(),
                  minOverLCM, snapShot, RLEOperator.min, start, end);

        }

        return minOverLCM;

      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public Resource getMinimumAllocation() {
    return Resources.clone(minAlloc);
  }

  @Override
  public void setTotalCapacity(Resource cap) {
    writeLock.lock();
    try {
      totalCapacity = Resources.clone(cap);
    } finally {
      writeLock.unlock();
    }
  }

  public long getEarliestStartTime() {
    readLock.lock();
    try {
      return rleSparseVector.getEarliestStartTime();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public long getLastEndTime() {
    readLock.lock();
    try {
      return rleSparseVector.getLatestNonNullTime();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public ResourceCalculator getResourceCalculator() {
    return resCalc;
  }

  @Override
  public String getQueueName() {
    return queueName;
  }

  @Override
  public Resource getMaximumAllocation() {
    return Resources.clone(maxAlloc);
  }

  @Override
  public long getMaximumPeriodicity() {
    return this.maxPeriodicity;
  }

  public String toCumulativeString() {
    readLock.lock();
    try {
      return rleSparseVector.toString() + "\n" + periodicRle.toString();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public Planner getReplanner() {
    return replanner;
  }

  @Override
  public boolean getMoveOnExpiry() {
    return getMoveOnExpiry;
  }

  @Override
  public String toString() {
    readLock.lock();
    try {
      StringBuilder planStr = new StringBuilder("In-memory Plan: ");
      planStr.append("Parent Queue: ").append(queueName)
          .append(" Total Capacity: ").append(totalCapacity).append(" Step: ")
          .append(step);
      for (ReservationAllocation reservation : getAllReservations()) {
        planStr.append(reservation);
      }
      return planStr.toString();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public Set<ReservationAllocation> getReservationByUserAtTime(String user,
      long t) {
    readLock.lock();
    try {
      Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
      for (ReservationAllocation ra : getReservationsAtTime(t)) {
        String resUser = ra.getUser();
        if (resUser != null && resUser.equals(user)) {
          resSet.add(ra);
        }
      }
      return resSet;
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public RLESparseResourceAllocation getCumulativeLoadOverTime(long start,
      long end) throws PlanningException {
    readLock.lock();
    try {

      RLESparseResourceAllocation ret =
          rleSparseVector.getRangeOverlapping(start, end);
      ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret,
          periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start,
          end);

      return ret;
    } finally {
      readLock.unlock();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractReservationSystem 源码

hadoop AbstractSchedulerPlanFollower 源码

hadoop CapacityOverTimePolicy 源码

hadoop CapacityReservationSystem 源码

hadoop CapacitySchedulerPlanFollower 源码

hadoop FairReservationSystem 源码

hadoop FairSchedulerPlanFollower 源码

hadoop InMemoryReservationAllocation 源码

hadoop NoOverCommitPolicy 源码

hadoop PeriodicRLESparseResourceAllocation 源码

0  赞