hadoop SchedulerNode 源码

  • 2022-10-20
  • 浏览 (162)

haddop SchedulerNode 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;


/**
 * Represents a YARN Cluster Node from the viewpoint of the scheduler.
 */
@Private
@Unstable
public abstract class SchedulerNode {

  private static final Logger LOG =
      LoggerFactory.getLogger(SchedulerNode.class);

  private Resource unallocatedResource = Resource.newInstance(0, 0);
  private Resource allocatedResource = Resource.newInstance(0, 0);
  private Resource totalResource;
  private RMContainer reservedContainer;
  private volatile int numContainers;
  private volatile ResourceUtilization containersUtilization =
      ResourceUtilization.newInstance(0, 0, 0f);
  private volatile ResourceUtilization nodeUtilization =
      ResourceUtilization.newInstance(0, 0, 0f);
  /** Time stamp for overcommitted resources to time out. */
  private long overcommitTimeout = -1;

  /* set of containers that are allocated containers */
  private final Map<ContainerId, ContainerInfo> launchedContainers =
      new HashMap<>();

  private final RMNode rmNode;
  private final String nodeName;
  private final RMContext rmContext;

  private volatile Set<String> labels = null;

  private volatile Set<NodeAttribute> nodeAttributes = null;

  // Last updated time
  private volatile long lastHeartbeatMonotonicTime;

  public SchedulerNode(RMNode node, boolean usePortForNodeName,
      Set<String> labels) {
    this.rmNode = node;
    this.rmContext = node.getRMContext();
    this.unallocatedResource = Resources.clone(node.getTotalCapability());
    this.totalResource = Resources.clone(node.getTotalCapability());
    if (usePortForNodeName) {
      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
    } else {
      nodeName = rmNode.getHostName();
    }
    this.labels = ImmutableSet.copyOf(labels);
    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
  }

  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
  }

  public RMNode getRMNode() {
    return this.rmNode;
  }

  /**
   * Set total resources on the node.
   * @param resource Total resources on the node.
   */
  public synchronized void updateTotalResource(Resource resource){
    this.totalResource = resource;
    this.unallocatedResource = Resources.subtract(totalResource,
        this.allocatedResource);
  }

  /**
   * Set the timeout for the node to stop overcommitting the resources. After
   * this time the scheduler will start killing containers until the resources
   * are not overcommitted anymore. This may reset a previous timeout.
   * @param timeOut Time out in milliseconds.
   */
  public synchronized void setOvercommitTimeOut(long timeOut) {
    if (timeOut >= 0) {
      if (this.overcommitTimeout != -1) {
        LOG.debug("The overcommit timeout for {} was already set to {}",
            getNodeID(), this.overcommitTimeout);
      }
      this.overcommitTimeout = Time.now() + timeOut;
    }
  }

  /**
   * Check if the time out has passed.
   * @return If the node is overcommitted.
   */
  public synchronized boolean isOvercommitTimedOut() {
    return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout;
  }

  /**
   * Check if the node has a time out for overcommit resources.
   * @return If the node has a time out for overcommit resources.
   */
  public synchronized boolean isOvercommitTimeOutSet() {
    return this.overcommitTimeout >= 0;
  }

  /**
   * Get the ID of the node which contains both its hostname and port.
   * @return The ID of the node.
   */
  public NodeId getNodeID() {
    return this.rmNode.getNodeID();
  }

  /**
   * Get HTTP address for the node.
   * @return HTTP address for the node.
   */
  public String getHttpAddress() {
    return this.rmNode.getHttpAddress();
  }

  /**
   * Get the name of the node for scheduling matching decisions.
   * <p>
   * Typically this is the 'hostname' reported by the node, but it could be
   * configured to be 'hostname:port' reported by the node via the
   * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
   * The main usecase of this is YARN minicluster to be able to differentiate
   * node manager instances by their port number.
   * @return Name of the node for scheduling matching decisions.
   */
  public String getNodeName() {
    return nodeName;
  }

  /**
   * Get rackname.
   * @return rackname
   */
  public String getRackName() {
    return this.rmNode.getRackName();
  }

  /**
   * The Scheduler has allocated containers on this node to the given
   * application.
   * @param rmContainer Allocated container
   */
  public void allocateContainer(RMContainer rmContainer) {
    allocateContainer(rmContainer, false);
  }

  /**
   * The Scheduler has allocated containers on this node to the given
   * application.
   * @param rmContainer Allocated container
   * @param launchedOnNode True if the container has been launched
   */
  protected synchronized void allocateContainer(RMContainer rmContainer,
      boolean launchedOnNode) {
    Container container = rmContainer.getContainer();
    if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
      deductUnallocatedResource(container.getResource());
      ++numContainers;
    }

    launchedContainers.put(container.getId(),
        new ContainerInfo(rmContainer, launchedOnNode));
  }

  /**
   * Get unallocated resources on the node.
   * @return Unallocated resources on the node
   */
  public synchronized Resource getUnallocatedResource() {
    return this.unallocatedResource;
  }

  /**
   * Get allocated resources on the node.
   * @return Allocated resources on the node
   */
  public synchronized Resource getAllocatedResource() {
    return this.allocatedResource;
  }

  /**
   * Get total resources on the node.
   * @return Total resources on the node.
   */
  public synchronized Resource getTotalResource() {
    return this.totalResource;
  }

  /**
   * Check if a container is launched by this node.
   * @return If the container is launched by the node.
   */
  public synchronized boolean isValidContainer(ContainerId containerId) {
    if (launchedContainers.containsKey(containerId)) {
      return true;
    }
    return false;
  }

  /**
   * Update the resources of the node when releasing a container.
   * @param container Container to release.
   */
  protected synchronized void updateResourceForReleasedContainer(
      Container container) {
    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
      addUnallocatedResource(container.getResource());
      --numContainers;
    }
  }

  /**
   * Release an allocated container on this node.
   * @param containerId ID of container to be released.
   * @param releasedByNode whether the release originates from a node update.
   */
  public synchronized void releaseContainer(ContainerId containerId,
      boolean releasedByNode) {
    ContainerInfo info = launchedContainers.get(containerId);
    if (info == null) {
      return;
    }
    if (!releasedByNode && info.launchedOnNode) {
      // wait until node reports container has completed
      return;
    }

    launchedContainers.remove(containerId);
    Container container = info.container.getContainer();

    // We remove allocation tags when a container is actually
    // released on NM. This is to avoid running into situation
    // when AM releases a container and NM has some delay to
    // actually release it, then the tag can still be visible
    // at RM so that RM can respect it during scheduling new containers.
    if (rmContext != null && rmContext.getAllocationTagsManager() != null) {
      rmContext.getAllocationTagsManager()
          .removeContainer(container.getNodeId(),
              container.getId(), container.getAllocationTags());
    }

    updateResourceForReleasedContainer(container);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Released container " + container.getId() + " of capacity "
              + container.getResource() + " on host " + rmNode.getNodeAddress()
              + ", which currently has " + numContainers + " containers, "
              + getAllocatedResource() + " used and " + getUnallocatedResource()
              + " available" + ", release resources=" + true);
    }
  }

  /**
   * Inform the node that a container has launched.
   * @param containerId ID of the launched container
   */
  public synchronized void containerStarted(ContainerId containerId) {
    ContainerInfo info = launchedContainers.get(containerId);
    if (info != null) {
      info.launchedOnNode = true;
    }
  }

  /**
   * Add unallocated resources to the node. This is used when unallocating a
   * container.
   * @param resource Resources to add.
   */
  private synchronized void addUnallocatedResource(Resource resource) {
    if (resource == null) {
      LOG.error("Invalid resource addition of null resource for "
          + rmNode.getNodeAddress());
      return;
    }
    Resources.addTo(unallocatedResource, resource);
    Resources.subtractFrom(allocatedResource, resource);
  }

  /**
   * Deduct unallocated resources from the node. This is used when allocating a
   * container.
   * @param resource Resources to deduct.
   */
  @VisibleForTesting
  public synchronized void deductUnallocatedResource(Resource resource) {
    if (resource == null) {
      LOG.error("Invalid deduction of null resource for "
          + rmNode.getNodeAddress());
      return;
    }
    Resources.subtractFrom(unallocatedResource, resource);
    Resources.addTo(allocatedResource, resource);
  }

  /**
   * Reserve container for the attempt on this node.
   * @param attempt Application attempt asking for the reservation.
   * @param schedulerKey Priority of the reservation.
   * @param container Container reserving resources for.
   */
  public abstract void reserveResource(SchedulerApplicationAttempt attempt,
      SchedulerRequestKey schedulerKey, RMContainer container);

  /**
   * Unreserve resources on this node.
   * @param attempt Application attempt that had done the reservation.
   */
  public abstract void unreserveResource(SchedulerApplicationAttempt attempt);

  @Override
  public String toString() {
    return "host: " + rmNode.getNodeAddress() + " #containers="
        + getNumContainers() + " available=" + getUnallocatedResource()
        + " used=" + getAllocatedResource();
  }

  /**
   * Get number of active containers on the node.
   * @return Number of active containers on the node.
   */
  public int getNumContainers() {
    return numContainers;
  }

  /**
   * Get the containers running on the node.
   * @return A copy of containers running on the node.
   */
  public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
    List<RMContainer> result = new ArrayList<>(launchedContainers.size());
    for (ContainerInfo info : launchedContainers.values()) {
      result.add(info.container);
    }
    return result;
  }

  /**
   * Get the containers running on the node with AM containers at the end.
   * @return A copy of running containers with AM containers at the end.
   */
  public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
    LinkedList<RMContainer> result = new LinkedList<>();
    for (ContainerInfo info : launchedContainers.values()) {
      if(info.container.isAMContainer()) {
        result.addLast(info.container);
      } else {
        result.addFirst(info.container);
      }
    }
    return result;
  }

  /**
   * Get the containers running on the node ordered by which to kill first. It
   * tries to kill AMs last, then GUARANTEED containers, and it kills
   * OPPORTUNISTIC first. If the same time, it uses the creation time.
   * @return A copy of the running containers ordered by which to kill first.
   */
  public List<RMContainer> getContainersToKill() {
    List<RMContainer> result = getLaunchedContainers();
    Collections.sort(result, (c1, c2) -> {
      return new CompareToBuilder()
          .append(c1.isAMContainer(), c2.isAMContainer())
          .append(c2.getExecutionType(), c1.getExecutionType()) // reversed
          .append(c2.getCreationTime(), c1.getCreationTime()) // reversed
          .toComparison();
    });
    return result;
  }

  /**
   * Get the launched containers in the node.
   * @return List of launched containers.
   */
  protected synchronized List<RMContainer> getLaunchedContainers() {
    List<RMContainer> result = new ArrayList<>();
    for (ContainerInfo info : launchedContainers.values()) {
      result.add(info.container);
    }
    return result;
  }

  /**
   * Get the container for the specified container ID.
   * @param containerId The container ID
   * @return The container for the specified container ID
   */
  protected synchronized RMContainer getContainer(ContainerId containerId) {
    RMContainer container = null;
    ContainerInfo info = launchedContainers.get(containerId);
    if (info != null) {
      container = info.container;
    }
    return container;
  }

  /**
   * Get the reserved container in the node.
   * @return Reserved container in the node.
   */
  public synchronized RMContainer getReservedContainer() {
    return reservedContainer;
  }

  /**
   * Set the reserved container in the node.
   * @param reservedContainer Reserved container in the node.
   */
  public synchronized void
  setReservedContainer(RMContainer reservedContainer) {
    this.reservedContainer = reservedContainer;
  }

  /**
   * Recover a container.
   * @param rmContainer Container to recover.
   */
  public synchronized void recoverContainer(RMContainer rmContainer) {
    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
      return;
    }
    allocateContainer(rmContainer, true);
  }

  /**
   * Get the labels for the node.
   * @return Set of labels for the node.
   */
  public Set<String> getLabels() {
    return labels;
  }

  /**
   * Update the labels for the node.
   * @param labels Set of labels for the node.
   */
  public void updateLabels(Set<String> labels) {
    this.labels = labels;
  }

  /**
   * Get partition of which the node belongs to, if node-labels of this node is
   * empty or null, it belongs to NO_LABEL partition. And since we only support
   * one partition for each node (YARN-2694), first label will be its partition.
   * @return Partition for the node.
   */
  public String getPartition() {
    if (this.labels == null || this.labels.isEmpty()) {
      return RMNodeLabelsManager.NO_LABEL;
    } else {
      return this.labels.iterator().next();
    }
  }

  /**
   * Set the resource utilization of the containers in the node.
   * @param containersUtilization Resource utilization of the containers.
   */
  public void setAggregatedContainersUtilization(
      ResourceUtilization containersUtilization) {
    this.containersUtilization = containersUtilization;
  }

  /**
   * Get the resource utilization of the containers in the node.
   * @return Resource utilization of the containers.
   */
  public ResourceUtilization getAggregatedContainersUtilization() {
    return this.containersUtilization;
  }

  /**
   * Set the resource utilization of the node. This includes the containers.
   * @param nodeUtilization Resource utilization of the node.
   */
  public void setNodeUtilization(ResourceUtilization nodeUtilization) {
    this.nodeUtilization = nodeUtilization;
  }

  /**
   * Get the resource utilization of the node.
   * @return Resource utilization of the node.
   */
  public ResourceUtilization getNodeUtilization() {
    return this.nodeUtilization;
  }

  public long getLastHeartbeatMonotonicTime() {
    return lastHeartbeatMonotonicTime;
  }

  /**
   * This will be called for each node heartbeat.
   */
  public void notifyNodeUpdate() {
    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (!(o instanceof SchedulerNode)) {
      return false;
    }

    SchedulerNode that = (SchedulerNode) o;

    return getNodeID().equals(that.getNodeID());
  }

  @Override
  public int hashCode() {
    return getNodeID().hashCode();
  }

  public Set<NodeAttribute> getNodeAttributes() {
    return nodeAttributes;
  }

  public void updateNodeAttributes(Set<NodeAttribute> attributes) {
    this.nodeAttributes = attributes;
  }

  private static class ContainerInfo {
    private final RMContainer container;
    private boolean launchedOnNode;

    public ContainerInfo(RMContainer container, boolean launchedOnNode) {
      this.container = container;
      this.launchedOnNode = launchedOnNode;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractResourceUsage 源码

hadoop AbstractUsersManager 源码

hadoop AbstractYarnScheduler 源码

hadoop ActiveUsersManager 源码

hadoop Allocation 源码

hadoop AppSchedulingInfo 源码

hadoop ApplicationPlacementAllocatorFactory 源码

hadoop CSQueueMetricsForCustomResources 源码

hadoop ClusterNodeTracker 源码

hadoop ConfigurationMutationACLPolicy 源码

0  赞