hadoop ContainerUpdateContext 源码

  • 2022-10-20
  • 浏览 (216)

haddop ContainerUpdateContext 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
    .RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Class encapsulates all outstanding container increase and decrease
 * requests for an application.
 */
public class ContainerUpdateContext {

  public static final ContainerId UNDEFINED =
      ContainerId.newContainerId(ApplicationAttemptId.newInstance(
              ApplicationId.newInstance(-1, -1), -1), -1);
  protected static final RecordFactory RECORD_FACTORY =
      RecordFactoryProvider.getRecordFactory(null);

  // Keep track of containers that are undergoing promotion
  private final Map<SchedulerRequestKey, Map<Resource,
      Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();

  private final Map<ContainerId, Resource> outstandingDecreases =
      new HashMap<>();
  private final AppSchedulingInfo appSchedulingInfo;

  ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
    this.appSchedulingInfo = appSchedulingInfo;
  }

  /**
   * Add the container to outstanding decreases.
   * @param updateReq UpdateContainerRequest.
   * @param schedulerNode SchedulerNode.
   * @param container Container.
   * @return If it was possible to decrease the container.
   */
  public synchronized boolean checkAndAddToOutstandingDecreases(
      UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
      Container container) {
    if (outstandingDecreases.containsKey(container.getId())) {
      return false;
    }
    if (ContainerUpdateType.DECREASE_RESOURCE ==
        updateReq.getContainerUpdateType()) {
      SchedulerRequestKey updateKey = new SchedulerRequestKey
          (container.getPriority(),
              container.getAllocationRequestId(), container.getId());
      cancelPreviousRequest(schedulerNode, updateKey);
      outstandingDecreases.put(container.getId(), updateReq.getCapability());
    } else {
      outstandingDecreases.put(container.getId(), container.getResource());
    }
    return true;
  }

  /**
   * Add the container to outstanding increases.
   * @param rmContainer RMContainer.
   * @param schedulerNode SchedulerNode.
   * @param updateRequest UpdateContainerRequest.
   * @return true if updated to outstanding increases was successful.
   */
  public synchronized boolean checkAndAddToOutstandingIncreases(
      RMContainer rmContainer, SchedulerNode schedulerNode,
      UpdateContainerRequest updateRequest) {
    Container container = rmContainer.getContainer();
    SchedulerRequestKey schedulerKey =
        SchedulerRequestKey.create(updateRequest,
            rmContainer.getAllocatedSchedulerKey());
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap == null) {
      resourceMap = new HashMap<>();
      outstandingIncreases.put(schedulerKey, resourceMap);
    } else {
      // Updating Resource for and existing increase container
      if (ContainerUpdateType.INCREASE_RESOURCE ==
          updateRequest.getContainerUpdateType()) {
        cancelPreviousRequest(schedulerNode, schedulerKey);
      } else {
        return false;
      }
    }
    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
    Map<NodeId, Set<ContainerId>> locationMap =
        resourceMap.get(resToIncrease);
    if (locationMap == null) {
      locationMap = new HashMap<>();
      resourceMap.put(resToIncrease, locationMap);
    }
    Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
    if (containerIds == null) {
      containerIds = new HashSet<>();
      locationMap.put(container.getNodeId(), containerIds);
    }
    if (outstandingDecreases.containsKey(container.getId())) {
      return false;
    }

    containerIds.add(container.getId());
    if (!Resources.isNone(resToIncrease)) {
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
          new HashMap<>();
      Map<String, ResourceRequest> resMap =
          createResourceRequests(rmContainer, schedulerNode,
              schedulerKey, resToIncrease);
      updateResReqs.put(schedulerKey, resMap);
      appSchedulingInfo.updateResourceRequests(updateResReqs, false);
    }
    return true;
  }

  private void cancelPreviousRequest(SchedulerNode schedulerNode,
      SchedulerRequestKey schedulerKey) {
    AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
        appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
    if (appPlacementAllocator != null) {
      PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk(
          ResourceRequest.ANY);
      // Decrement the pending using a dummy RR with
      // resource = prev update req capability
      if (pendingAsk != null && pendingAsk.getCount() > 0) {
        Container container = Container.newInstance(UNDEFINED,
            schedulerNode.getNodeID(), "host:port",
            pendingAsk.getPerAllocationResource(),
            schedulerKey.getPriority(), null);
        appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
            schedulerKey,
            new RMContainerImpl(container, schedulerKey,
                appSchedulingInfo.getApplicationAttemptId(),
                schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
                appSchedulingInfo.getRMContext(),
                appPlacementAllocator.getPrimaryRequestedNodePartition()));
      }
    }
  }

  private Map<String, ResourceRequest> createResourceRequests(
      RMContainer rmContainer, SchedulerNode schedulerNode,
      SchedulerRequestKey schedulerKey, Resource resToIncrease) {
    Map<String, ResourceRequest> resMap = new HashMap<>();
    resMap.put(rmContainer.getContainer().getNodeId().getHost(),
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, rmContainer.getContainer().getNodeId().getHost()));
    resMap.put(schedulerNode.getRackName(),
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, schedulerNode.getRackName()));
    resMap.put(ResourceRequest.ANY,
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, ResourceRequest.ANY));
    return resMap;
  }

  private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
      RMContainer rmContainer) {
    if (updateReq.getContainerUpdateType() ==
        ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
      return rmContainer.getContainer().getResource();
    }
    if (updateReq.getContainerUpdateType() ==
        ContainerUpdateType.INCREASE_RESOURCE) {
      //       This has to equal the Resources in excess of fitsIn()
      //       for container increase and is equal to the container total
      //       resource for Promotion.
      Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
          rmContainer.getContainer().getResource());
      return Resources.add(maxCap,
          Resources.negate(rmContainer.getContainer().getResource()));
    }
    return null;
  }

  private static ResourceRequest createResourceReqForIncrease(
      SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
      ResourceRequest rr, RMContainer rmContainer, String resourceName) {
    rr.setResourceName(resourceName);
    rr.setNumContainers(1);
    rr.setRelaxLocality(false);
    rr.setPriority(rmContainer.getContainer().getPriority());
    rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
    rr.setCapability(resToIncrease);
    rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
    rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
        ExecutionType.GUARANTEED, true));
    return rr;
  }

  /**
   * Remove Container from outstanding increases / decreases. Calling this
   * method essentially completes the update process.
   * @param schedulerKey SchedulerRequestKey.
   * @param container Container.
   */
  public synchronized void removeFromOutstandingUpdate(
      SchedulerRequestKey schedulerKey, Container container) {
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap != null) {
      Map<NodeId, Set<ContainerId>> locationMap =
          resourceMap.get(container.getResource());
      if (locationMap != null) {
        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
        if (containerIds != null && !containerIds.isEmpty()) {
          containerIds.remove(container.getId());
          if (containerIds.isEmpty()) {
            locationMap.remove(container.getNodeId());
          }
        }
        if (locationMap.isEmpty()) {
          resourceMap.remove(container.getResource());
        }
      }
      if (resourceMap.isEmpty()) {
        outstandingIncreases.remove(schedulerKey);
      }
    }
    outstandingDecreases.remove(container.getId());
  }

  /**
   * Check if a new container is to be matched up against an outstanding
   * Container increase request.
   * @param node SchedulerNode.
   * @param schedulerKey SchedulerRequestKey.
   * @param rmContainer RMContainer.
   * @return ContainerId.
   */
  public ContainerId matchContainerToOutstandingIncreaseReq(
      SchedulerNode node, SchedulerRequestKey schedulerKey,
      RMContainer rmContainer) {
    ContainerId retVal = null;
    Container container = rmContainer.getContainer();
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap != null) {
      Map<NodeId, Set<ContainerId>> locationMap =
          resourceMap.get(container.getResource());
      if (locationMap != null) {
        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
        if (containerIds != null && !containerIds.isEmpty()) {
          retVal = containerIds.iterator().next();
        }
      }
    }
    // Allocation happened on NM on the same host, but not on the NM
    // we need.. We need to signal that this container has to be released.
    // We also need to add these requests back.. to be reallocated.
    if (resourceMap != null && retVal == null) {
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate =
          new HashMap<>();
      Map<String, ResourceRequest> resMap = createResourceRequests
          (rmContainer, node, schedulerKey,
          rmContainer.getContainer().getResource());
      reqsToUpdate.put(schedulerKey, resMap);
      appSchedulingInfo.updateResourceRequests(reqsToUpdate, true);
      return UNDEFINED;
    }
    return retVal;
  }

  /**
   * Swaps the existing RMContainer's and the temp RMContainers internal
   * container references after adjusting the resources in each.
   * @param tempRMContainer Temp RMContainer.
   * @param existingRMContainer Existing RMContainer.
   * @param updateType Update Type.
   * @return Existing RMContainer after swapping the container references.
   */
  public RMContainer swapContainer(RMContainer tempRMContainer,
      RMContainer existingRMContainer, ContainerUpdateType updateType) {
    ContainerId matchedContainerId = existingRMContainer.getContainerId();
    // Swap updated container with the existing container
    Container tempContainer = tempRMContainer.getContainer();

    Resource updatedResource = createUpdatedResource(
        tempContainer, existingRMContainer.getContainer(), updateType);
    Resource resourceToRelease = createResourceToRelease(
        existingRMContainer.getContainer(), updateType);
    Container newContainer = Container.newInstance(matchedContainerId,
        existingRMContainer.getContainer().getNodeId(),
        existingRMContainer.getContainer().getNodeHttpAddress(),
        updatedResource,
        existingRMContainer.getContainer().getPriority(), null,
        tempContainer.getExecutionType());
    newContainer.setExposedPorts(
        existingRMContainer.getContainer().getExposedPorts());
    newContainer.setAllocationRequestId(
        existingRMContainer.getContainer().getAllocationRequestId());
    newContainer.setVersion(existingRMContainer.getContainer().getVersion());

    tempRMContainer.getContainer().setResource(resourceToRelease);
    tempRMContainer.getContainer().setExecutionType(
        existingRMContainer.getContainer().getExecutionType());

    ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
    return existingRMContainer;
  }

  /**
   * Returns the resource that the container will finally be assigned with
   * at the end of the update operation.
   * @param tempContainer Temporary Container created for the operation.
   * @param existingContainer Existing Container.
   * @param updateType Update Type.
   * @return Final Resource.
   */
  private Resource createUpdatedResource(Container tempContainer,
      Container existingContainer, ContainerUpdateType updateType) {
    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
      return Resources.add(existingContainer.getResource(),
          tempContainer.getResource());
    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
      return outstandingDecreases.get(existingContainer.getId());
    } else {
      return existingContainer.getResource();
    }
  }

  /**
   * Returns the resources that need to be released at the end of the update
   * operation.
   * @param existingContainer Existing Container.
   * @param updateType Updated type.
   * @return Resources to be released.
   */
  private Resource createResourceToRelease(Container existingContainer,
      ContainerUpdateType updateType) {
    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
      return Resources.none();
    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
      return Resources.add(existingContainer.getResource(),
          Resources.negate(
              outstandingDecreases.get(existingContainer.getId())));
    } else {
      return existingContainer.getResource();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractResourceUsage 源码

hadoop AbstractUsersManager 源码

hadoop AbstractYarnScheduler 源码

hadoop ActiveUsersManager 源码

hadoop Allocation 源码

hadoop AppSchedulingInfo 源码

hadoop ApplicationPlacementAllocatorFactory 源码

hadoop CSQueueMetricsForCustomResources 源码

hadoop ClusterNodeTracker 源码

hadoop ConfigurationMutationACLPolicy 源码

0  赞