hadoop SchedulerHealth 源码

  • 2022-10-20
  • 浏览 (204)

haddop SchedulerHealth 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerHealth.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * SchedulerHealth class holds the details of the schedulers operations.
 *
 * <p><code>SchedulerHealth</code> provides clients with information such as:
 * <ol>
 *   <li>
 *   scheduler's latest timestamp
 *   </li>
 *   <li>
 *   resources allocated, reserved, released in the last scheduler run
 *   </li>
 *   <li>
 *   latest allocation, release, reservation, preemption details
 *   </li>
 *   <li>
 *   count of latest allocation, release, reservation, preemption
 *   </li>
 *   <li>
 *   aggregate count of latest allocation, release, reservation, preemption,
 *   fulfilled reservation
 *   </li>
 *</ol>
 *
 */

public class SchedulerHealth {

  static public class DetailedInformation {
    long timestamp;
    NodeId nodeId;
    ContainerId containerId;
    String queue;

    public DetailedInformation(long timestamp, NodeId nodeId,
        ContainerId containerId, String queue) {
      this.timestamp = timestamp;
      this.nodeId = nodeId;
      this.containerId = containerId;
      this.queue = queue;
    }

    public long getTimestamp() {
      return timestamp;
    }

    public NodeId getNodeId() {
      return nodeId;
    }

    public ContainerId getContainerId() {
      return containerId;
    }

    public String getQueue() {
      return queue;
    }
  }

  enum Operation {
    ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION
  }

  private long lastSchedulerRunTime;
  private Map<Operation, Resource> lastSchedulerRunDetails;
  private Map<Operation, DetailedInformation> lastSchedulerHealthDetails;
  private Map<Operation, Long> schedulerOperationCounts;
  // this is for counts since the RM started, never reset
  private Map<Operation, Long> schedulerOperationAggregateCounts;

  SchedulerHealth() {
    lastSchedulerRunDetails = new ConcurrentHashMap<>();
    lastSchedulerHealthDetails = new ConcurrentHashMap<>();
    schedulerOperationCounts = new ConcurrentHashMap<>();
    schedulerOperationAggregateCounts = new ConcurrentHashMap<>();
    for (Operation op : Operation.values()) {
      lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0));
      schedulerOperationCounts.put(op, 0L);
      lastSchedulerHealthDetails.put(op, new DetailedInformation(0, null, null,
        null));
      schedulerOperationAggregateCounts.put(op, 0L);
    }

  }

  public void updateAllocation(long timestamp, NodeId nodeId,
      ContainerId containerId, String queue) {
    DetailedInformation di =
        new DetailedInformation(timestamp, nodeId, containerId, queue);
    lastSchedulerHealthDetails.put(Operation.ALLOCATION, di);
  }

  public void updateRelease(long timestamp, NodeId nodeId,
      ContainerId containerId, String queue) {
    DetailedInformation di =
        new DetailedInformation(timestamp, nodeId, containerId, queue);
    lastSchedulerHealthDetails.put(Operation.RELEASE, di);
  }

  public void updatePreemption(long timestamp, NodeId nodeId,
      ContainerId containerId, String queue) {
    DetailedInformation di =
        new DetailedInformation(timestamp, nodeId, containerId, queue);
    lastSchedulerHealthDetails.put(Operation.PREEMPTION, di);
  }

  public void updateReservation(long timestamp, NodeId nodeId,
      ContainerId containerId, String queue) {
    DetailedInformation di =
        new DetailedInformation(timestamp, nodeId, containerId, queue);
    lastSchedulerHealthDetails.put(Operation.RESERVATION, di);
  }

  public void updateSchedulerRunDetails(long timestamp, Resource allocated,
      Resource reserved) {
    lastSchedulerRunTime = timestamp;
    lastSchedulerRunDetails.put(Operation.ALLOCATION, allocated);
    lastSchedulerRunDetails.put(Operation.RESERVATION, reserved);
  }

  public void updateSchedulerReleaseDetails(long timestamp, Resource released) {
    lastSchedulerRunTime = timestamp;
    lastSchedulerRunDetails.put(Operation.RELEASE, released);
  }

  public void updateSchedulerReleaseCounts(long count) {
    updateCounts(Operation.RELEASE, count);
  }

  public void updateSchedulerAllocationCounts(long count) {
    updateCounts(Operation.ALLOCATION, count);
  }

  public void updateSchedulerReservationCounts(long count) {
    updateCounts(Operation.RESERVATION, count);
  }

  public void updateSchedulerFulfilledReservationCounts(long count) {
    updateCounts(Operation.FULFILLED_RESERVATION, count);
  }

  public void updateSchedulerPreemptionCounts(long count) {
    updateCounts(Operation.PREEMPTION, count);
  }

  private void updateCounts(Operation op, long count) {
    schedulerOperationCounts.put(op, count);
    Long tmp = schedulerOperationAggregateCounts.get(op);
    schedulerOperationAggregateCounts.put(op, tmp + count);
  }

  /**
   * Get the timestamp of the latest scheduler operation.
   *
   * @return the scheduler's latest timestamp
   */
  public long getLastSchedulerRunTime() {
    return lastSchedulerRunTime;
  }

  private Resource getResourceDetails(Operation op) {
    return lastSchedulerRunDetails.get(op);
  }

  /**
   * Get the resources allocated in the last scheduler run.
   *
   * @return resources allocated
   */
  public Resource getResourcesAllocated() {
    return getResourceDetails(Operation.ALLOCATION);
  }

  /**
   * Get the resources reserved in the last scheduler run.
   *
   * @return resources reserved
   */
  public Resource getResourcesReserved() {
    return getResourceDetails(Operation.RESERVATION);
  }

  /**
   * Get the resources released in the last scheduler run.
   *
   * @return resources released
   */
  public Resource getResourcesReleased() {
    return getResourceDetails(Operation.RELEASE);
  }

  private DetailedInformation getDetailedInformation(Operation op) {
    return lastSchedulerHealthDetails.get(op);
  }

  /**
   * Get the details of last allocation.
   *
   * @return last allocation details
   */
  public DetailedInformation getLastAllocationDetails() {
    return getDetailedInformation(Operation.ALLOCATION);
  }

  /**
   * Get the details of last release.
   *
   * @return last release details
   */
  public DetailedInformation getLastReleaseDetails() {
    return getDetailedInformation(Operation.RELEASE);
  }

  /**
   * Get the details of last reservation.
   *
   * @return last reservation details
   */
  public DetailedInformation getLastReservationDetails() {
    return getDetailedInformation(Operation.RESERVATION);
  }

  /**
   * Get the details of last preemption.
   *
   * @return last preemption details
   */
  public DetailedInformation getLastPreemptionDetails() {
    return getDetailedInformation(Operation.PREEMPTION);
  }

  private Long getOperationCount(Operation op) {
    return schedulerOperationCounts.get(op);
  }

  /**
   * Get the count of allocation from the latest scheduler health report.
   *
   * @return allocation count
   */
  public Long getAllocationCount() {
    return getOperationCount(Operation.ALLOCATION);
  }

  /**
   * Get the count of release from the latest scheduler health report.
   *
   * @return release count
   */
  public Long getReleaseCount() {
    return getOperationCount(Operation.RELEASE);
  }

  /**
   * Get the count of reservation from the latest scheduler health report.
   *
   * @return reservation count
   */
  public Long getReservationCount() {
    return getOperationCount(Operation.RESERVATION);
  }

  /**
   * Get the count of preemption from the latest scheduler health report.
   *
   * @return preemption count
   */
  public Long getPreemptionCount() {
    return getOperationCount(Operation.PREEMPTION);
  }

  private Long getAggregateOperationCount(Operation op) {
    return schedulerOperationAggregateCounts.get(op);
  }

  /**
   * Get the aggregate of all the allocations count.
   *
   * @return aggregate allocation count
   */
  public Long getAggregateAllocationCount() {
    return getAggregateOperationCount(Operation.ALLOCATION);
  }

  /**
   * Get the aggregate of all the release count.
   *
   * @return aggregate release count
   */
  public Long getAggregateReleaseCount() {
    return getAggregateOperationCount(Operation.RELEASE);
  }

  /**
   * Get the aggregate of all the reservations count.
   *
   * @return aggregate reservation count
   */
  public Long getAggregateReservationCount() {
    return getAggregateOperationCount(Operation.RESERVATION);
  }

  /**
   * Get the aggregate of all the preemption count.
   *
   * @return aggregate preemption count
   */
  public Long getAggregatePreemptionCount() {
    return getAggregateOperationCount(Operation.PREEMPTION);
  }

  /**
   * Get the aggregate of all the fulfilled reservations count.
   *
   * @return aggregate fulfilled reservations count
   */
  public Long getAggregateFulFilledReservationsCount() {
    return getAggregateOperationCount(Operation.FULFILLED_RESERVATION);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractResourceUsage 源码

hadoop AbstractUsersManager 源码

hadoop AbstractYarnScheduler 源码

hadoop ActiveUsersManager 源码

hadoop Allocation 源码

hadoop AppSchedulingInfo 源码

hadoop ApplicationPlacementAllocatorFactory 源码

hadoop CSQueueMetricsForCustomResources 源码

hadoop ClusterNodeTracker 源码

hadoop ConfigurationMutationACLPolicy 源码

0  赞