hadoop DistributedScheduler 源码

  • 2022-10-20
  • 浏览 (176)

haddop DistributedScheduler 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.scheduler;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;

import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
 * <code>AMRMProxy</code> request interceptor. It is responsible for the
 * following:</p>
 * <ul>
 *   <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
 *   response objects to extract instructions from the
 *   <code>ClusterMonitor</code> running on the ResourceManager to aid in making
 *   distributed scheduling decisions.</li>
 *   <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
 *   containers for the outstanding OPPORTUNISTIC container requests.</li>
 * </ul>
 */
public final class DistributedScheduler extends AbstractRequestInterceptor {

  private static final Logger LOG = LoggerFactory
      .getLogger(DistributedScheduler.class);

  private final static RecordFactory RECORD_FACTORY =
      RecordFactoryProvider.getRecordFactory(null);

  private OpportunisticContainerContext oppContainerContext =
      new OpportunisticContainerContext();

  // Mapping of NodeId to NodeTokens. Populated either from RM response or
  // generated locally if required.
  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
  private ApplicationAttemptId applicationAttemptId;
  private OpportunisticContainerAllocator containerAllocator;
  private NMTokenSecretManagerInNM nmSecretManager;
  private String appSubmitter;
  private long rmIdentifier;

  public void init(AMRMProxyApplicationContext applicationContext) {
    super.init(applicationContext);
    initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
        .getRMIdentifier(),
        applicationContext.getApplicationAttemptId(),
        applicationContext.getNMCotext().getContainerAllocator(),
        applicationContext.getNMCotext().getNMTokenSecretManager(),
        applicationContext.getUser());
  }

  @VisibleForTesting
  void initLocal(long rmId, ApplicationAttemptId appAttemptId,
      OpportunisticContainerAllocator oppContainerAllocator,
      NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
    this.rmIdentifier = rmId;
    this.applicationAttemptId = appAttemptId;
    this.containerAllocator = oppContainerAllocator;
    this.nmSecretManager = nmSecretManager;
    this.appSubmitter = appSubmitter;

    // Overrides the Generator to decrement container id.
    this.oppContainerContext.setContainerIdGenerator(
        new OpportunisticContainerAllocator.ContainerIdGenerator() {
          @Override
          public long generateContainerId() {
            return this.containerIdCounter.decrementAndGet();
          }
        });
  }

  /**
   * Route register call to the corresponding distributed scheduling method viz.
   * registerApplicationMasterForDistributedScheduling, and return response to
   * the caller after stripping away Distributed Scheduling information.
   *
   * @param request
   *          registration request
   * @return Allocate Response
   * @throws YarnException YarnException
   * @throws IOException IOException
   */
  @Override
  public RegisterApplicationMasterResponse registerApplicationMaster
      (RegisterApplicationMasterRequest request) throws YarnException,
      IOException {
    return registerApplicationMasterForDistributedScheduling(request)
        .getRegisterResponse();
  }

  /**
   * Route allocate call to the allocateForDistributedScheduling method and
   * return response to the caller after stripping away Distributed Scheduling
   * information.
   *
   * @param request
   *          allocation request
   * @return Allocate Response
   * @throws YarnException YarnException
   * @throws IOException IOException
   */
  @Override
  public AllocateResponse allocate(AllocateRequest request) throws
      YarnException, IOException {
    DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY
        .newRecordInstance(DistributedSchedulingAllocateRequest.class);
    distRequest.setAllocateRequest(request);
    return allocateForDistributedScheduling(distRequest).getAllocateResponse();
  }

  @Override
  public FinishApplicationMasterResponse finishApplicationMaster
      (FinishApplicationMasterRequest request) throws YarnException,
      IOException {
    return getNextInterceptor().finishApplicationMaster(request);
  }

  /**
   * Adds all the newly allocated Containers to the allocate Response.
   * Additionally, in case the NMToken for one of the nodes does not exist, it
   * generates one and adds it to the response.
   */
  private void updateAllocateResponse(AllocateResponse response,
      List<NMToken> nmTokens, List<Container> allocatedContainers) {
    List<NMToken> newTokens = new ArrayList<>();
    if (allocatedContainers.size() > 0) {
      response.getAllocatedContainers().addAll(allocatedContainers);
      for (Container alloc : allocatedContainers) {
        if (!nodeTokens.containsKey(alloc.getNodeId())) {
          newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
        }
      }
      List<NMToken> retTokens = new ArrayList<>(nmTokens);
      retTokens.addAll(newTokens);
      response.setNMTokens(retTokens);
    }
  }

  private void updateParameters(
      RegisterDistributedSchedulingAMResponse registerResponse) {
    Resource incrementResource = registerResponse.getIncrContainerResource();
    if (incrementResource == null) {
      incrementResource = registerResponse.getMinContainerResource();
    }
    oppContainerContext.updateAllocationParams(
        registerResponse.getMinContainerResource(),
        registerResponse.getMaxContainerResource(),
        incrementResource,
        registerResponse.getContainerTokenExpiryInterval());

    oppContainerContext.getContainerIdGenerator()
        .resetContainerIdCounter(registerResponse.getContainerIdStart());
    setNodeList(registerResponse.getNodesForScheduling());
  }

  private void setNodeList(List<RemoteNode> nodeList) {
    oppContainerContext.updateNodeList(nodeList);
  }

  @Override
  public RegisterDistributedSchedulingAMResponse
      registerApplicationMasterForDistributedScheduling(
          RegisterApplicationMasterRequest request)
      throws YarnException, IOException {
    LOG.info("Forwarding registration request to the" +
        "Distributed Scheduler Service on YARN RM");
    RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor()
        .registerApplicationMasterForDistributedScheduling(request);
    updateParameters(dsResp);
    return dsResp;
  }

  @Override
  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
      DistributedSchedulingAllocateRequest request)
      throws YarnException, IOException {

    // Partition requests to GUARANTEED and OPPORTUNISTIC.
    OpportunisticContainerAllocator.PartitionedResourceRequests
        partitionedAsks = containerAllocator
        .partitionAskList(request.getAllocateRequest().getAskList());

    // Allocate OPPORTUNISTIC containers.
    List<Container> allocatedContainers =
        containerAllocator.allocateContainers(
            request.getAllocateRequest().getResourceBlacklistRequest(),
            partitionedAsks.getOpportunistic(), applicationAttemptId,
            oppContainerContext, rmIdentifier, appSubmitter);

    // Prepare request for sending to RM for scheduling GUARANTEED containers.
    request.setAllocatedContainers(allocatedContainers);
    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());

    LOG.debug("Forwarding allocate request to the" +
          "Distributed Scheduler Service on YARN RM");

    DistributedSchedulingAllocateResponse dsResp =
        getNextInterceptor().allocateForDistributedScheduling(request);

    // Update host to nodeId mapping
    setNodeList(dsResp.getNodesForScheduling());
    List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
    for (NMToken nmToken : nmTokens) {
      nodeTokens.put(nmToken.getNodeId(), nmToken);
    }

    // Check if we have NM tokens for all the allocated containers. If not
    // generate one and update the response.
    updateAllocateResponse(
        dsResp.getAllocateResponse(), nmTokens, allocatedContainers);

    return dsResp;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BufferPool 源码

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

0  赞