hadoop RMActiveServiceContext 源码

  • 2022-10-20
  • 浏览 (223)

haddop RMActiveServiceContext 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;

/**
 * The RMActiveServiceContext is the class that maintains <b>Active</b> service
 * context. Services that need to run only on the Active RM. This is expected to
 * be used only by RMContext.
 */
@Private
@Unstable
public class RMActiveServiceContext {

  private static final Logger LOG = LoggerFactory
      .getLogger(RMActiveServiceContext.class);

  private final ConcurrentMap<ApplicationId, RMApp> applications =
      new ConcurrentHashMap<ApplicationId, RMApp>();

  private final ConcurrentMap<NodeId, RMNode> nodes =
      new ConcurrentHashMap<NodeId, RMNode>();

  private final ConcurrentMap<NodeId, RMNode> inactiveNodes =
      new ConcurrentHashMap<NodeId, RMNode>();

  private final ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentials =
    new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>();

  private boolean isWorkPreservingRecoveryEnabled;

  private AMLivelinessMonitor amLivelinessMonitor;
  private AMLivelinessMonitor amFinishingMonitor;
  private RMStateStore stateStore = null;
  private ContainerAllocationExpirer containerAllocationExpirer;
  private DelegationTokenRenewer delegationTokenRenewer;
  private AMRMTokenSecretManager amRMTokenSecretManager;
  private RMContainerTokenSecretManager containerTokenSecretManager;
  private NMTokenSecretManagerInRM nmTokenSecretManager;
  private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
  private ClientRMService clientRMService;
  private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
  private ResourceScheduler scheduler;
  private ReservationSystem reservationSystem;
  private NodesListManager nodesListManager;
  private ResourceTrackerService resourceTrackerService;
  private ApplicationMasterService applicationMasterService;

  private RMNodeLabelsManager nodeLabelManager;
  private NodeAttributesManager nodeAttributesManager;
  private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
  private long epoch;
  private Clock systemClock = SystemClock.getInstance();
  private long schedulerRecoveryStartTime = 0;
  private long schedulerRecoveryWaitTime = 0;
  private boolean printLog = true;
  private boolean isSchedulerReady = false;
  private PlacementManager queuePlacementManager = null;

  private RMAppLifetimeMonitor rmAppLifetimeMonitor;
  private QueueLimitCalculator queueLimitCalculator;
  private AllocationTagsManager allocationTagsManager;
  private PlacementConstraintManager placementConstraintManager;
  private ResourceProfilesManager resourceProfilesManager;
  private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;

  private ProxyCAManager proxyCAManager;
  private VolumeManager volumeManager;

  private AtomicLong tokenSequenceNo = new AtomicLong(1);

  public RMActiveServiceContext() {
    queuePlacementManager = new PlacementManager();
  }

  @Private
  @Unstable
  public RMActiveServiceContext(Dispatcher rmDispatcher,
      ContainerAllocationExpirer containerAllocationExpirer,
      AMLivelinessMonitor amLivelinessMonitor,
      AMLivelinessMonitor amFinishingMonitor,
      DelegationTokenRenewer delegationTokenRenewer,
      AMRMTokenSecretManager appTokenSecretManager,
      RMContainerTokenSecretManager containerTokenSecretManager,
      NMTokenSecretManagerInRM nmTokenSecretManager,
      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
      ResourceScheduler scheduler) {
    this();
    this.setContainerAllocationExpirer(containerAllocationExpirer);
    this.setAMLivelinessMonitor(amLivelinessMonitor);
    this.setAMFinishingMonitor(amFinishingMonitor);
    this.setDelegationTokenRenewer(delegationTokenRenewer);
    this.setAMRMTokenSecretManager(appTokenSecretManager);
    this.setContainerTokenSecretManager(containerTokenSecretManager);
    this.setNMTokenSecretManager(nmTokenSecretManager);
    this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
    this.setScheduler(scheduler);

    RMStateStore nullStore = new NullRMStateStore();
    nullStore.setRMDispatcher(rmDispatcher);
    try {
      nullStore.init(new YarnConfiguration());
      setStateStore(nullStore);
    } catch (Exception e) {
      assert false;
    }
  }

  @Private
  @Unstable
  public void setStateStore(RMStateStore store) {
    stateStore = store;
  }

  @Private
  @Unstable
  public ClientRMService getClientRMService() {
    return clientRMService;
  }

  @Private
  @Unstable
  public ApplicationMasterService getApplicationMasterService() {
    return applicationMasterService;
  }

  @Private
  @Unstable
  public ResourceTrackerService getResourceTrackerService() {
    return resourceTrackerService;
  }

  @Private
  @Unstable
  public RMStateStore getStateStore() {
    return stateStore;
  }

  @Private
  @Unstable
  public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
    return this.applications;
  }

  @Private
  @Unstable
  public ConcurrentMap<NodeId, RMNode> getRMNodes() {
    return this.nodes;
  }

  @Private
  @Unstable
  public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
    return this.inactiveNodes;
  }

  @Private
  @Unstable
  public ContainerAllocationExpirer getContainerAllocationExpirer() {
    return this.containerAllocationExpirer;
  }

  @Private
  @Unstable
  public AMLivelinessMonitor getAMLivelinessMonitor() {
    return this.amLivelinessMonitor;
  }

  @Private
  @Unstable
  public AMLivelinessMonitor getAMFinishingMonitor() {
    return this.amFinishingMonitor;
  }

  @Private
  @Unstable
  public DelegationTokenRenewer getDelegationTokenRenewer() {
    return delegationTokenRenewer;
  }

  @Private
  @Unstable
  public AMRMTokenSecretManager getAMRMTokenSecretManager() {
    return this.amRMTokenSecretManager;
  }

  @Private
  @Unstable
  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
    return this.containerTokenSecretManager;
  }

  @Private
  @Unstable
  public NMTokenSecretManagerInRM getNMTokenSecretManager() {
    return this.nmTokenSecretManager;
  }

  @Private
  @Unstable
  public ResourceScheduler getScheduler() {
    return this.scheduler;
  }

  @Private
  @Unstable
  public ReservationSystem getReservationSystem() {
    return this.reservationSystem;
  }

  @Private
  @Unstable
  public NodesListManager getNodesListManager() {
    return this.nodesListManager;
  }

  @Private
  @Unstable
  public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
    return this.clientToAMTokenSecretManager;
  }

  @Private
  @Unstable
  public void setClientRMService(ClientRMService clientRMService) {
    this.clientRMService = clientRMService;
  }

  @Private
  @Unstable
  public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
    return this.rmDelegationTokenSecretManager;
  }

  @Private
  @Unstable
  public void setRMDelegationTokenSecretManager(
      RMDelegationTokenSecretManager delegationTokenSecretManager) {
    this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
  }

  @Private
  @Unstable
  void setContainerAllocationExpirer(
      ContainerAllocationExpirer containerAllocationExpirer) {
    this.containerAllocationExpirer = containerAllocationExpirer;
  }

  @Private
  @Unstable
  void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
    this.amLivelinessMonitor = amLivelinessMonitor;
  }

  @Private
  @Unstable
  void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
    this.amFinishingMonitor = amFinishingMonitor;
  }

  @Private
  @Unstable
  void setContainerTokenSecretManager(
      RMContainerTokenSecretManager containerTokenSecretManager) {
    this.containerTokenSecretManager = containerTokenSecretManager;
  }

  @Private
  @Unstable
  void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
    this.nmTokenSecretManager = nmTokenSecretManager;
  }

  @Private
  @Unstable
  void setScheduler(ResourceScheduler scheduler) {
    this.scheduler = scheduler;
  }

  @Private
  @Unstable
  void setReservationSystem(ReservationSystem reservationSystem) {
    this.reservationSystem = reservationSystem;
  }

  @Private
  @Unstable
  void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
    this.delegationTokenRenewer = delegationTokenRenewer;
  }

  @Private
  @Unstable
  void setClientToAMTokenSecretManager(
      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
    this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
  }

  @Private
  @Unstable
  void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
    this.amRMTokenSecretManager = amRMTokenSecretManager;
  }

  @Private
  @Unstable
  void setNodesListManager(NodesListManager nodesListManager) {
    this.nodesListManager = nodesListManager;
  }

  @Private
  @Unstable
  void setApplicationMasterService(
      ApplicationMasterService applicationMasterService) {
    this.applicationMasterService = applicationMasterService;
  }

  @Private
  @Unstable
  void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
    this.resourceTrackerService = resourceTrackerService;
  }

  @Private
  @Unstable
  public void setWorkPreservingRecoveryEnabled(boolean enabled) {
    this.isWorkPreservingRecoveryEnabled = enabled;
  }

  @Private
  @Unstable
  public boolean isWorkPreservingRecoveryEnabled() {
    return this.isWorkPreservingRecoveryEnabled;
  }

  @Private
  @Unstable
  public long getEpoch() {
    return this.epoch;
  }

  @Private
  @Unstable
  void setEpoch(long epoch) {
    this.epoch = epoch;
  }

  @Private
  @Unstable
  public RMNodeLabelsManager getNodeLabelManager() {
    return nodeLabelManager;
  }

  @Private
  @Unstable
  public void setNodeLabelManager(RMNodeLabelsManager mgr) {
    nodeLabelManager = mgr;
  }

  @Private
  @Unstable
  public NodeAttributesManager getNodeAttributesManager() {
    return nodeAttributesManager;
  }

  @Private
  @Unstable
  public void setNodeAttributesManager(NodeAttributesManager mgr) {
    nodeAttributesManager = mgr;
  }

  @Private
  @Unstable
  public AllocationTagsManager getAllocationTagsManager() {
    return allocationTagsManager;
  }

  @Private
  @Unstable
  public void setAllocationTagsManager(
      AllocationTagsManager allocationTagsManager) {
    this.allocationTagsManager = allocationTagsManager;
  }

  @Private
  @Unstable
  public PlacementConstraintManager getPlacementConstraintManager() {
    return placementConstraintManager;
  }

  @Private
  @Unstable
  public void setPlacementConstraintManager(
      PlacementConstraintManager placementConstraintManager) {
    this.placementConstraintManager = placementConstraintManager;
  }

  @Private
  @Unstable
  public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
    return rmDelegatedNodeLabelsUpdater;
  }

  @Private
  @Unstable
  public void setRMDelegatedNodeLabelsUpdater(
      RMDelegatedNodeLabelsUpdater nodeLablesUpdater) {
    rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
  }

  @Private
  @Unstable
  public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
    return multiNodeSortingManager;
  }

  @Private
  @Unstable
  public void setMultiNodeSortingManager(
      MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
    this.multiNodeSortingManager = multiNodeSortingManager;
  }

  @Private
  @Unstable
  public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
    this.schedulerRecoveryStartTime = systemClock.getTime();
    this.schedulerRecoveryWaitTime = waitTime;
  }

  @Private
  @Unstable
  public boolean isSchedulerReadyForAllocatingContainers() {
    if (isSchedulerReady) {
      return isSchedulerReady;
    }
    isSchedulerReady =
        (systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
    if (!isSchedulerReady && printLog) {
      LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
      printLog = false;
    }
    if (isSchedulerReady) {
      LOG.info("Scheduler recovery is done. Start allocating new containers.");
    }
    return isSchedulerReady;
  }

  @Private
  @Unstable
  public void setSystemClock(Clock clock) {
    this.systemClock = clock;
  }

  @Private
  @Unstable
  public ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto>
      getSystemCredentialsForApps() {
    return systemCredentials;
  }
  
  @Private
  @Unstable
  public PlacementManager getQueuePlacementManager() {
    return queuePlacementManager;
  }
  
  @Private
  @Unstable
  public void setQueuePlacementManager(PlacementManager placementMgr) {
    this.queuePlacementManager = placementMgr;
  }

  @Private
  @Unstable
  public void setRMAppLifetimeMonitor(
      RMAppLifetimeMonitor lifetimeMonitor) {
    this.rmAppLifetimeMonitor = lifetimeMonitor;
  }

  @Private
  @Unstable
  public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
    return this.rmAppLifetimeMonitor;
  }

  @Private
  @Unstable
  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
    return this.queueLimitCalculator;
  }

  @Private
  @Unstable
  public void setContainerQueueLimitCalculator(
      QueueLimitCalculator limitCalculator) {
    this.queueLimitCalculator = limitCalculator;
  }

  public ResourceProfilesManager getResourceProfilesManager() {
    return resourceProfilesManager;
  }

  public void setResourceProfilesManager(
      ResourceProfilesManager resourceProfilesManager) {
    this.resourceProfilesManager = resourceProfilesManager;
  }

  @Private
  @Unstable
  public ProxyCAManager getProxyCAManager() {
    return proxyCAManager;
  }

  @Private
  @Unstable
  public void setProxyCAManager(ProxyCAManager proxyCAManager) {
    this.proxyCAManager = proxyCAManager;
  }

  @Private
  @Unstable
  public VolumeManager getVolumeManager() {
    return this.volumeManager;
  }

  @Private
  @Unstable
  public void setVolumeManager(VolumeManager volumeManager) {
    this.volumeManager = volumeManager;
  }

  /**
   * Get token sequence no.
   *
   * @return the tokenSequenceNo
   */
  public Long getTokenSequenceNo() {
    return tokenSequenceNo.get();
  }

  /**
   * Increment token sequence no.
   *
   */
  public void incrTokenSequenceNo() {
    this.tokenSequenceNo.incrementAndGet();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMSProcessingChain 源码

hadoop ActiveStandbyElectorBasedElectorService 源码

hadoop AdminService 源码

hadoop ApplicationMasterService 源码

hadoop ClientRMService 源码

hadoop ClusterMetrics 源码

hadoop ClusterMonitor 源码

hadoop CuratorBasedElectorService 源码

hadoop DBManager 源码

hadoop DecommissioningNodesWatcher 源码

0  赞