hadoop ReservationInputValidator 源码

  • 2022-10-20
  • 浏览 (192)

haddop ReservationInputValidator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.List;

import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;

public class ReservationInputValidator {

  private final Clock clock;

  /**
   * Utility class to validate reservation requests.
   *
   * @param clock the {@link Clock} to use
   */
  public ReservationInputValidator(Clock clock) {
    this.clock = clock;
  }

  private Plan validateReservation(ReservationSystem reservationSystem,
      ReservationId reservationId, String auditConstant) throws YarnException {
    // check if the reservation id is valid
    if (reservationId == null) {
      String message = "Missing reservation id."
          + " Please try again by specifying a reservation id.";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    String queue = reservationSystem.getQueueForReservation(reservationId);
    String nullQueueErrorMessage =
        "The specified reservation with ID: " + reservationId
            + " is unknown. Please try again with a valid reservation.";
    String nullPlanErrorMessage = "The specified reservation: " + reservationId
        + " is not associated with any valid plan."
        + " Please try again with a valid reservation.";
    return getPlanFromQueue(reservationSystem, queue, auditConstant,
        nullQueueErrorMessage, nullPlanErrorMessage);
  }

  private void validateReservationDefinition(ReservationId reservationId,
      ReservationDefinition contract, Plan plan, String auditConstant)
      throws YarnException {
    String message = "";
    // check if deadline is in the past
    if (contract == null) {
      message = "Missing reservation definition."
          + " Please try again by specifying a reservation definition.";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    if (contract.getDeadline() <= clock.getTime()) {
      message = "The specified deadline: " + contract.getDeadline()
          + " is the past. Please try again with deadline in the future.";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    // Check if at least one RR has been specified
    ReservationRequests resReqs = contract.getReservationRequests();
    if (resReqs == null) {
      message = "No resources have been specified to reserve."
          + "Please try again by specifying the resources to reserve.";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    List<ReservationRequest> resReq = resReqs.getReservationResources();
    if (resReq == null || resReq.isEmpty()) {
      message = "No resources have been specified to reserve."
          + " Please try again by specifying the resources to reserve.";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    // compute minimum duration and max gang size
    long minDuration = 0;
    Resource maxGangSize = Resource.newInstance(0, 0);
    ReservationRequestInterpreter type =
        contract.getReservationRequests().getInterpreter();
    for (ReservationRequest rr : resReq) {
      if (type == ReservationRequestInterpreter.R_ALL
          || type == ReservationRequestInterpreter.R_ANY) {
        minDuration = Math.max(minDuration, rr.getDuration());
      } else {
        minDuration += rr.getDuration();
      }
      maxGangSize = Resources.max(plan.getResourceCalculator(),
          plan.getTotalCapacity(), maxGangSize,
          Resources.multiply(rr.getCapability(), rr.getConcurrency()));
    }
    // verify the allocation is possible (skip for ANY)
    long duration = contract.getDeadline() - contract.getArrival();
    if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) {
      message = "The time difference (" + (duration) + ") between arrival ("
          + contract.getArrival() + ") " + "and deadline ("
          + contract.getDeadline() + ") must "
          + " be greater or equal to the minimum resource duration ("
          + minDuration + ")";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    // check that the largest gang does not exceed the inventory available
    // capacity (skip for ANY)
    if (Resources.greaterThan(plan.getResourceCalculator(),
        plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
        && type != ReservationRequestInterpreter.R_ANY) {
      message = "The size of the largest gang in the reservation definition ("
          + maxGangSize + ") exceed the capacity available ("
          + plan.getTotalCapacity() + " )";
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input definition", "ClientRMService", message);
      throw RPCUtil.getRemoteException(message);
    }
    // check that the recurrence is a positive long value.
    String recurrenceExpression = contract.getRecurrenceExpression();
    try {
      long recurrence = Long.parseLong(recurrenceExpression);
      if (recurrence < 0) {
        message = "Negative Period : " + recurrenceExpression + ". Please try"
            + " again with a non-negative long value as period.";
        throw RPCUtil.getRemoteException(message);
      }
      // verify duration is less than recurrence for periodic reservations
      if (recurrence > 0 && duration > recurrence) {
        message = "Duration of the requested reservation: " + duration
            + " is greater than the recurrence: " + recurrence
            + ". Please try again with a smaller duration.";
        throw RPCUtil.getRemoteException(message);
      }
      // verify maximum period is divisible by recurrence expression.
      if (recurrence > 0 && plan.getMaximumPeriodicity() % recurrence != 0) {
        message = "The maximum periodicity: " + plan.getMaximumPeriodicity() +
            " must be divisible by the recurrence expression provided: " +
            recurrence + ". Please try again with a recurrence expression" +
            " that satisfies this requirement.";
        throw RPCUtil.getRemoteException(message);
      }
    } catch (NumberFormatException e) {
      message = "Invalid period " + recurrenceExpression + ". Please try"
          + " again with a non-negative long value as period.";
      throw RPCUtil.getRemoteException(message);
    }
  }

  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
      String queue, String auditConstant) throws YarnException {
    String nullQueueErrorMessage = "The queue is not specified."
        + " Please try again with a valid reservable queue.";
    String nullPlanErrorMessage = "The specified queue: " + queue
        + " is not managed by reservation system."
        + " Please try again with a valid reservable queue.";
    return getPlanFromQueue(reservationSystem, queue, auditConstant,
        nullQueueErrorMessage, nullPlanErrorMessage);
  }

  private Plan getPlanFromQueue(ReservationSystem reservationSystem,
      String queue, String auditConstant, String nullQueueErrorMessage,
      String nullPlanErrorMessage) throws YarnException {
    if (queue == null || queue.isEmpty()) {
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input", "ClientRMService",
          nullQueueErrorMessage);
      throw RPCUtil.getRemoteException(nullQueueErrorMessage);
    }
    // check if the associated plan is valid
    Plan plan = reservationSystem.getPlan(queue);
    if (plan == null) {
      RMAuditLogger.logFailure("UNKNOWN", auditConstant,
          "validate reservation input", "ClientRMService",
          nullPlanErrorMessage);
      throw RPCUtil.getRemoteException(nullPlanErrorMessage);
    }
    return plan;
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast) the input and returns the appropriate {@link Plan} associated with
   * the specified {@link Queue} or throws an exception message illustrating the
   * details of any validation check failures
   * 
   * @param reservationSystem the {@link ReservationSystem} to validate against
   * @param request the {@link ReservationSubmissionRequest} defining the
   *          resources required over time for the request
   * @param reservationId the {@link ReservationId} associated with the current
   *          request
   * @return the {@link Plan} to submit the request to
   * @throws YarnException if validation fails
   */
  public Plan validateReservationSubmissionRequest(
      ReservationSystem reservationSystem, ReservationSubmissionRequest request,
      ReservationId reservationId) throws YarnException {
    String message;
    if (reservationId == null) {
      message = "Reservation id cannot be null. Please try again specifying "
          + " a valid reservation id by creating a new reservation id.";
      throw RPCUtil.getRemoteException(message);
    }
    // Check if it is a managed queue
    String queue = request.getQueue();
    Plan plan = getPlanFromQueue(reservationSystem, queue,
        AuditConstants.SUBMIT_RESERVATION_REQUEST);

    validateReservationDefinition(reservationId,
        request.getReservationDefinition(), plan,
        AuditConstants.SUBMIT_RESERVATION_REQUEST);
    return plan;
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast) the input and returns the appropriate {@link Plan} associated with
   * the specified {@link Queue} or throws an exception message illustrating the
   * details of any validation check failures
   * 
   * @param reservationSystem the {@link ReservationSystem} to validate against
   * @param request the {@link ReservationUpdateRequest} defining the resources
   *          required over time for the request
   * @return the {@link Plan} to submit the request to
   * @throws YarnException if validation fails
   */
  public Plan validateReservationUpdateRequest(
      ReservationSystem reservationSystem, ReservationUpdateRequest request)
      throws YarnException {
    ReservationId reservationId = request.getReservationId();
    Plan plan = validateReservation(reservationSystem, reservationId,
        AuditConstants.UPDATE_RESERVATION_REQUEST);
    validateReservationDefinition(reservationId,
        request.getReservationDefinition(), plan,
        AuditConstants.UPDATE_RESERVATION_REQUEST);
    return plan;
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast) the input and returns the appropriate {@link Plan} associated with
   * the specified {@link Queue} or throws an exception message illustrating the
   * details of any validation check failures.
   *
   * @param reservationSystem the {@link ReservationSystem} to validate against
   * @param request the {@link ReservationListRequest} defining search
   *          parameters for reservations in the {@link ReservationSystem} that
   *          is being validated against.
   * @return the {@link Plan} to list reservations of.
   * @throws YarnException if validation fails
   */
  public Plan validateReservationListRequest(
      ReservationSystem reservationSystem, ReservationListRequest request)
      throws YarnException {
    String queue = request.getQueue();
    if (request.getEndTime() < request.getStartTime()) {
      String errorMessage = "The specified end time must be greater than "
          + "the specified start time.";
      RMAuditLogger.logFailure("UNKNOWN",
          AuditConstants.LIST_RESERVATION_REQUEST,
          "validate list reservation input", "ClientRMService", errorMessage);
      throw RPCUtil.getRemoteException(errorMessage);
    }
    // Check if it is a managed queue
    return getPlanFromQueue(reservationSystem, queue,
        AuditConstants.LIST_RESERVATION_REQUEST);
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast) the input and returns the appropriate {@link Plan} associated with
   * the specified {@link Queue} or throws an exception message illustrating the
   * details of any validation check failures
   * 
   * @param reservationSystem the {@link ReservationSystem} to validate against
   * @param request the {@link ReservationDeleteRequest} defining the resources
   *          required over time for the request
   * @return the {@link Plan} to submit the request to
   * @throws YarnException if validation fails
   */
  public Plan validateReservationDeleteRequest(
      ReservationSystem reservationSystem, ReservationDeleteRequest request)
      throws YarnException {
    return validateReservation(reservationSystem, request.getReservationId(),
        AuditConstants.DELETE_RESERVATION_REQUEST);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractReservationSystem 源码

hadoop AbstractSchedulerPlanFollower 源码

hadoop CapacityOverTimePolicy 源码

hadoop CapacityReservationSystem 源码

hadoop CapacitySchedulerPlanFollower 源码

hadoop FairReservationSystem 源码

hadoop FairSchedulerPlanFollower 源码

hadoop InMemoryPlan 源码

hadoop InMemoryReservationAllocation 源码

hadoop NoOverCommitPolicy 源码

0  赞