hadoop AbstractLeafQueue 源码
haddop AbstractLeafQueue 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;
public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractLeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
// TODO the max applications should consider label
protected int maxApplications;
protected volatile int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
private volatile int rackLocalityAdditionalDelay;
private volatile boolean rackLocalityFullReset;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new ConcurrentHashMap<>();
private Priority defaultAppPriorityPerQueue;
private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy;
private volatile float minimumAllocationFactor;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final UsersManager usersManager;
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo();
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
// Not thread safe: only the last level is a ConcurrentMap
@VisibleForTesting
Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
userLimitsCache = new HashMap<>();
// Not thread safe
@VisibleForTesting
long currentUserLimitCacheVersion = 0;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new ConcurrentHashMap<>();
List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>();
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(queueContext, queueName, parent, old, false);
}
public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
IOException {
super(queueContext, queueName, parent, old);
setDynamicQueue(isDynamic);
this.usersManager = new UsersManager(usageTracker.getMetrics(), this, labelManager,
resourceCalculator);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
}
@SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource) throws
IOException {
writeLock.lock();
try {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
super.setupQueueConfigs(clusterResource);
this.lastClusterResource = clusterResource;
this.cachedResourceLimitsForHeadroom = new ResourceLimits(
clusterResource);
// Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
setQueueResourceLimitsInfo(clusterResource);
setOrderingPolicy(
configuration.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
usersManager.setUserLimit(configuration.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(configuration.getUserLimitFactor(getQueuePath()));
maxAMResourcePerQueuePercent =
configuration.getMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath());
maxApplications = configuration.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps =
configuration.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
}
}
priorityAcls = configuration.getPriorityAcls(getQueuePath(),
configuration.getClusterLevelApplicationMaxPriority());
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
throw new IOException(
"Invalid default label expression of " + " queue=" + getQueuePath()
+ " doesn't have permission to access all labels "
+ "in default label expression. labelExpression of resource request="
+ getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
getAccessibleNodeLabels() == null ?
"" :
StringUtils
.join(getAccessibleNodeLabels().iterator(), ',')));
}
nodeLocalityDelay = configuration.getNodeLocalityDelay();
rackLocalityAdditionalDelay = configuration
.getRackLocalityAdditionalDelay();
rackLocalityFullReset = configuration
.getRackLocalityFullReset();
// re-init this since max allocation could have changed
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
Resources.subtract(
queueAllocationSettings.getMaximumAllocation(),
queueAllocationSettings.getMinimumAllocation()),
queueAllocationSettings.getMaximumAllocation());
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleNodeLabels != null) {
for (String nodeLabel : accessibleNodeLabels) {
labelStrBuilder.append(nodeLabel).append(",");
}
}
defaultAppPriorityPerQueue = Priority.newInstance(
configuration.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights.
float queueUserLimit = Math.min(100.0f, configuration.getUserLimit(getQueuePath()));
getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
usersManager.updateUserWeights();
LOG.info(
"Initializing " + getQueuePath() + "\n" +
getExtendedCapacityOrWeightString() + "\n"
+ "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity()
+ " [= parentAbsoluteCapacity * capacity ]" + "\n"
+ "maxCapacity = " + queueCapacities.getMaximumCapacity()
+ " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = "
+ queueCapacities.getAbsoluteMaximumCapacity()
+ " [= 1.0 maximumCapacity undefined, "
+ "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
+ "\n" + "effectiveMinResource=" +
getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n"
+ " , effectiveMaxResource=" +
getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL)
+ "\n" + "userLimit = " + usersManager.getUserLimit()
+ " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+ usersManager.getUserLimitFactor()
+ " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+ maxApplications
+ " [= configuredMaximumSystemApplicationsPerQueue or"
+ " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
+ "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
+ " [= (int)(maxApplications * (userLimit / 100.0f) * "
+ "userLimitFactor) ]" + "\n"
+ "maxParallelApps = " + getMaxParallelApps() + "\n"
+ "usedCapacity = " +
+ queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
+ "(clusterResourceMemory * absoluteCapacity)]" + "\n"
+ "absoluteUsedCapacity = " + absoluteUsedCapacity
+ " [= usedResourcesMemory / clusterResourceMemory]" + "\n"
+ "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent
+ " [= configuredMaximumAMResourcePercent ]" + "\n"
+ "minimumAllocationFactor = " + minimumAllocationFactor
+ " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ queueAllocationSettings.getMaximumAllocation() +
" [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + usageTracker.getNumContainers()
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n"
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
+ "rackLocalityAdditionalDelay = "
+ rackLocalityAdditionalDelay + "\n"
+ "labels=" + labelStrBuilder.toString() + "\n"
+ "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue + "\npriority = " + priority
+ "\nmaxLifetime = " + getMaximumApplicationLifetime()
+ " seconds" + "\ndefaultLifetime = "
+ getDefaultApplicationLifetime() + " seconds");
} finally {
writeLock.unlock();
}
}
private String getDefaultNodeLabelExpressionStr() {
String defaultLabelExpression = queueNodeLabelsSettings.getDefaultLabelExpression();
return defaultLabelExpression == null ? "" : defaultLabelExpression;
}
/**
* Used only by tests.
*/
@Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
/**
* Used only by tests.
*/
@Private
public float getMaxAMResourcePerQueuePercent() {
return maxAMResourcePerQueuePercent;
}
public int getMaxApplications() {
return maxApplications;
}
public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}
/**
*
* @return UsersManager instance.
*/
public UsersManager getUsersManager() {
return usersManager;
}
@Override
public AbstractUsersManager getAbstractUsersManager() {
return usersManager;
}
@Override
public List<CSQueue> getChildQueues() {
return null;
}
/**
* Set user limit.
* @param userLimit new user limit
*/
@VisibleForTesting
void setUserLimit(float userLimit) {
usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute();
}
/**
* Set user limit factor.
* @param userLimitFactor new user limit factor
*/
@VisibleForTesting
void setUserLimitFactor(float userLimitFactor) {
usersManager.setUserLimitFactor(userLimitFactor);
usersManager.userLimitNeedsRecompute();
}
@Override
public int getNumApplications() {
readLock.lock();
try {
return getNumPendingApplications() + getNumActiveApplications() +
getNumNonRunnableApps();
} finally {
readLock.unlock();
}
}
public int getNumPendingApplications() {
readLock.lock();
try {
return pendingOrderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
}
}
public int getNumActiveApplications() {
readLock.lock();
try {
return orderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
}
}
@Private
public int getNumPendingApplications(String user) {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
}
return u.getPendingApplications();
} finally {
readLock.unlock();
}
}
@Private
public int getNumActiveApplications(String user) {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
}
return u.getActiveApplications();
} finally {
readLock.unlock();
}
}
@Private
public float getUserLimit() {
return usersManager.getUserLimit();
}
@Private
public float getUserLimitFactor() {
return usersManager.getUserLimitFactor();
}
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = getQueueInfo();
return queueInfo;
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
readLock.lock();
try {
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueuePath());
userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo);
} finally {
readLock.unlock();
}
}
public String toString() {
readLock.lock();
try {
return getQueuePath() + ": " + getCapacityOrWeightString()
+ ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
+ ", " + "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", "
+ "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications()
+ ", " + "numContainers=" + getNumContainers() + ", "
+ "effectiveMinResource=" +
getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) +
" , effectiveMaxResource=" +
getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL);
} finally {
readLock.unlock();
}
}
protected String getExtendedCapacityOrWeightString() {
if (queueCapacities.getWeight() != -1) {
return "weight = " + queueCapacities.getWeight()
+ " [= (float) configuredCapacity (with w suffix)] " + "\n"
+ "normalizedWeight = " + queueCapacities.getNormalizedWeight()
+ " [= (float) configuredCapacity / sum(configuredCapacity of " +
"all queues under the parent)]";
} else {
return "capacity = " + queueCapacities.getCapacity()
+ " [= (float) configuredCapacity / 100 ]";
}
}
@VisibleForTesting
public User getUser(String userName) {
return usersManager.getUser(userName);
}
@VisibleForTesting
public User getOrCreateUser(String userName) {
return usersManager.getUserAndAddIfAbsent(userName);
}
@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
readLock.lock();
try {
return new ArrayList<>(priorityAcls);
} finally {
readLock.unlock();
}
}
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
writeLock.lock();
try {
// We skip reinitialize for dynamic queues, when this is called, and
// new queue is different from this queue, we will make this queue to be
// static queue.
if (newlyParsedQueue != this) {
this.setDynamicQueue(false);
}
// Sanity check
if (!(newlyParsedQueue instanceof AbstractLeafQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
AbstractLeafQueue newlyParsedLeafQueue = (AbstractLeafQueue) newlyParsedQueue;
// don't allow the maximum allocation to be decreased in size
// since we have already told running AM's the size
Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (!Resources.fitsIn(oldMax, newMax)) {
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " the maximum allocation size can not be decreased!"
+ " Current setting: " + oldMax + ", trying to set it to: "
+ newMax);
}
setupQueueConfigs(clusterResource);
} finally {
writeLock.unlock();
}
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
submitApplicationAttempt(application, userName, false);
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName, boolean isMoveApp) {
// Careful! Locking order is important!
boolean isAppAlreadySubmitted = applicationAttemptMap.containsKey(
application.getApplicationAttemptId());
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
// Add the attempt to our data-structures
addApplicationAttempt(application, user);
} finally {
writeLock.unlock();
}
// We don't want to update metrics for move app
if (!isMoveApp && !isAppAlreadySubmitted) {
boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
application.getAppSchedulingInfo().isUnmanagedAM();
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
}
parent.submitApplicationAttempt(application, userName);
}
@Override
public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
// Signal for expired auto deletion.
updateLastSubmittedTimeStamp();
// Inform the parent queue
try {
parent.submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
throw ace;
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
writeLock.lock();
try {
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for queues
//TODO recalculate max applications because they can depend on capacity
if (getNumApplications() >= getMaxApplications() &&
!(this instanceof AutoCreatedLeafQueue)) {
String msg =
"Queue " + getQueuePath() + " already has " + getNumApplications()
+ " applications,"
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for the user on this queue
User user = usersManager.getUserAndAddIfAbsent(userName);
//TODO recalculate max applications because they can depend on capacity
if (user.getTotalApplications() >= getMaxApplicationsPerUser() &&
!(this instanceof AutoCreatedLeafQueue)) {
String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
} finally {
writeLock.unlock();
}
try {
parent.validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
throw ace;
}
}
public Resource getAMResourceLimit() {
return usageTracker.getQueueUsage().getAMLimit();
}
public Resource getAMResourceLimitPerPartition(String nodePartition) {
return usageTracker.getQueueUsage().getAMLimit(nodePartition);
}
@VisibleForTesting
public Resource calculateAndGetAMResourceLimit() {
return calculateAndGetAMResourceLimitPerPartition(
RMNodeLabelsManager.NO_LABEL);
}
@VisibleForTesting
public Resource getUserAMResourceLimit() {
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
null);
}
public Resource getUserAMResourceLimitPerPartition(
String nodePartition, String userName) {
float userWeight = 1.0f;
if (userName != null && getUser(userName) != null) {
userWeight = getUser(userName).getWeight();
}
readLock.lock();
try {
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
float preWeightedUserLimit = effectiveUserLimit;
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
Resource minimumAllocation = queueAllocationSettings.getMinimumAllocation();
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
if (getUserLimitFactor() == -1) {
userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition),
minimumAllocation);
}
userAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
userAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
Resource preWeighteduserAMLimit =
Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* preWeightedUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
if (getUserLimitFactor() == -1) {
preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition),
minimumAllocation);
}
preWeighteduserAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
preWeighteduserAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
usageTracker.getQueueUsage().setUserAMLimit(nodePartition, preWeighteduserAMLimit);
LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted"
+ " user AM limit: {}. User weight: {}", userName,
preWeighteduserAMLimit, userAMLimit, userWeight);
return userAMLimit;
} finally {
readLock.unlock();
}
}
public Resource calculateAndGetAMResourceLimitPerPartition(
String nodePartition) {
writeLock.lock();
try {
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
* partition in the queue. For labeled partition, consider only the absolute
* resources guaranteed. Multiply this value (based on labeled/
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
// usage limit.
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
synchronized (queueResourceLimitsInfo){
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
}
float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
nodePartition);
// Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource.
// If any of the resources available to this queue are less than queue's
// guarantee, use the guarantee as the queuePartitionUsableResource
// because nothing less than the queue's guarantee should be used when
// calculating the AM limit.
Resource queuePartitionUsableResource = (Resources.fitsIn(
resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
queueCurrentLimit : queuePartitionResource;
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
queueAllocationSettings.getMinimumAllocation());
usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
LOG.debug("Queue: {}, node label : {}, queue partition resource : {},"
+ " queue current limit : {}, queue partition usable resource : {},"
+ " amResourceLimit : {}", getQueuePath(), nodePartition,
queuePartitionResource, queueCurrentLimit,
queuePartitionUsableResource, amResouceLimit);
return amResouceLimit;
} finally {
writeLock.unlock();
}
}
protected void activateApplications() {
writeLock.lock();
try {
// limit of allowed resource usage for application masters
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
// AM Resource Limit for accessible labels can be pre-calculated.
// This will help in updating AMResourceLimit for all labels when queue
// is initialized for the first time (when no applications are present).
for (String nodePartition : getNodeLabelsForQueue()) {
calculateAndGetAMResourceLimitPerPartition(nodePartition);
}
for (Iterator<FiCaSchedulerApp> fsApp = getPendingAppsOrderingPolicy()
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
fsApp.hasNext();) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
// Get the am-node-partition associated with each application
// and calculate max-am resource limit for this partition.
String partitionName = application.getAppAMNodePartitionName();
Resource amLimit = getAMResourceLimitPerPartition(partitionName);
// Verify whether we already calculated am-limit for this label.
if (amLimit == null) {
amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
}
// Check am resource limit.
Resource amIfStarted = Resources.add(
application.getAMResource(partitionName),
usageTracker.getQueueUsage().getAMUsed(partitionName));
if (LOG.isDebugEnabled()) {
LOG.debug("application " + application.getId() + " AMResource "
+ application.getAMResource(partitionName)
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+ " amLimit " + amLimit + " lastClusterResource "
+ lastClusterResource + " amIfStarted " + amIfStarted
+ " AM node-partition name " + partitionName);
}
if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue, it is likely set too low."
+ " skipping enforcement to allow at least one application"
+ " to start");
} else{
application.updateAMContainerDiagnostics(
SchedulerApplicationAttempt.AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
LOG.debug("Not activating application {} as amIfStarted: {}"
+ " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
continue;
}
}
// Check user am resource limit
User user = usersManager.getUserAndAddIfAbsent(application.getUser());
Resource userAMLimit = userAmPartitionLimit.get(partitionName);
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
application.getUser());
userAmPartitionLimit.put(partitionName, userAMLimit);
}
Resource userAmIfStarted = Resources.add(
application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue for user, it is likely set too"
+ " low. skipping enforcement to allow at least one application"
+ " to start");
} else{
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
LOG.debug("Not activating application {} for user: {} as"
+ " userAmIfStarted: {} exceeds userAmLimit: {}",
applicationId, user, userAmIfStarted, userAMLimit);
continue;
}
}
user.activateApplication();
orderingPolicy.addSchedulableEntity(application);
application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
usageTracker.getQueueUsage().incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
usageTracker.getMetrics().setAMResouceLimitForUser(partitionName,
application.getUser(), userAMLimit);
fsApp.remove();
LOG.info("Application " + applicationId + " from user: " + application
.getUser() + " activated in queue: " + getQueuePath());
}
} finally {
writeLock.unlock();
}
}
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
writeLock.lock();
try {
applicationAttemptMap.put(application.getApplicationAttemptId(),
application);
if (application.isRunnable()) {
runnableApps.add(application);
LOG.debug("Adding runnable application: {}",
application.getApplicationAttemptId());
} else {
nonRunnableApps.add(application);
LOG.info("Application attempt {} is not runnable,"
+ " parallel limit reached", application.getApplicationAttemptId());
return;
}
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
// Activate applications
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
lastClusterResource, Resources.none())) {
activateApplications();
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
LOG.info("Skipping activateApplications for "
+ application.getApplicationAttemptId()
+ " since cluster resource is " + Resources.none());
}
LOG.info(
"Application added -" + " appId: " + application.getApplicationId()
+ " user: " + application.getUser() + "," + " leaf-queue: "
+ getQueuePath() + " #user-pending-applications: " + user
.getPendingApplications() + " #user-active-applications: " + user
.getActiveApplications() + " #queue-pending-applications: "
+ getNumPendingApplications() + " #queue-active-applications: "
+ getNumActiveApplications()
+ " #queue-nonrunnable-applications: "
+ getNumNonRunnableApps());
} finally {
writeLock.unlock();
}
}
@Override
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
usersManager.deactivateApplication(user, application);
appFinished();
// Inform the parent queue
parent.finishApplication(application, user);
}
@Override
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
removeApplicationAttempt(application, application.getUser());
parent.finishApplicationAttempt(application, queue);
}
private void removeApplicationAttempt(
FiCaSchedulerApp application, String userName) {
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
boolean runnable = runnableApps.remove(application);
if (!runnable) {
// removeNonRunnableApp acquires the write lock again, which is fine
if (!removeNonRunnableApp(application)) {
LOG.error("Given app to remove " + application +
" does not exist in queue " + getQueuePath());
}
}
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application);
} else{
usageTracker.getQueueUsage().decAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().decAMUsed(partitionName,
application.getAMResource(partitionName));
usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
usersManager.removeUser(application.getUser());
}
// Check if we can activate more applications
activateApplications();
LOG.info(
"Application removed -" + " appId: " + application.getApplicationId()
+ " user: " + application.getUser() + " queue: " + getQueuePath()
+ " #user-pending-applications: " + user.getPendingApplications()
+ " #user-active-applications: " + user.getActiveApplications()
+ " #queue-pending-applications: " + getNumPendingApplications()
+ " #queue-active-applications: " + getNumActiveApplications());
} finally {
writeLock.unlock();
}
}
private FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applicationAttemptMap.get(applicationAttemptId);
}
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
if (!usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)
.equals(Resources.none())) {
limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
queueContext.getClusterResource(), usageTracker.getQueueUsage().getUsed(nodePartition),
usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)));
return;
}
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
// Irrespective of Single / Multi Node Placement, the allocate from
// Reserved Container has to happen only for the single node which
// CapacityScheduler#allocateFromReservedContainer invokes with.
// Else In Multi Node Placement, there won't be any Allocation or
// Reserve of new containers when there is a RESERVED container on
// a node which is full.
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (node != null) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application = getApplication(
reservedContainer.getApplicationAttemptId());
if (null != application) {
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node, SystemClock.getInstance().getTime(), application);
CSAssignment assignment = application.assignContainers(
clusterResource, candidates, currentResourceLimits,
schedulingMode, reservedContainer);
return assignment;
}
}
}
return null;
}
private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
String partition,
SchedulingMode schedulingMode) {
synchronized (userLimitsCache) {
long latestVersion = usersManager.getLatestVersionOfUsersState();
if (latestVersion != this.currentUserLimitCacheVersion) {
// User limits cache needs invalidating
this.currentUserLimitCacheVersion = latestVersion;
userLimitsCache.clear();
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
return uLCBySchedulingMode;
}
// User limits cache does not need invalidating
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = userLimitsCache.get(partition);
if (uLCByPartition == null) {
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
}
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
uLCByPartition.get(schedulingMode);
if (uLCBySchedulingMode == null) {
uLCBySchedulingMode = new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
}
return uLCBySchedulingMode;
}
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: partition=" + candidates.getPartition()
+ " #applications=" + orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
// Check for reserved resources, try to allocate reserved container first.
CSAssignment assignment = allocateFromReservedContainer(clusterResource,
candidates, currentResourceLimits, schedulingMode);
if (null != assignment) {
return assignment;
}
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + candidates
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
return CSAssignment.NULL_ASSIGNMENT;
}
ConcurrentMap<String, CachedUserLimit> userLimits =
this.getUserLimitCache(candidates.getPartition(), schedulingMode);
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
for (Iterator<FiCaSchedulerApp> assignmentIterator = orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node, SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) {
if (!super.canAssignToThisQueue(clusterResource,
candidates.getPartition(), currentResourceLimits, appReserved,
schedulingMode)) {
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(),
ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
return CSAssignment.NULL_ASSIGNMENT;
}
// If there was no reservation and canAssignToThisQueue returned
// true, there is no reason to check further.
if (!this.reservationsContinueLooking
|| appReserved.equals(Resources.none())) {
needAssignToQueueCheck = false;
}
}
CachedUserLimit cul = userLimits.get(application.getUser());
Resource cachedUserLimit = null;
if (cul != null) {
cachedUserLimit = cul.userLimit;
}
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, candidates.getPartition(), schedulingMode,
cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
CachedUserLimit retVal =
userLimits.putIfAbsent(application.getUser(), cul);
if (retVal != null) {
// another thread updated the user limit cache before us
cul = retVal;
userLimit = cul.userLimit;
}
}
// Check user limit
boolean userAssignable = true;
if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
userAssignable = false;
} else {
userAssignable = canAssignToUser(clusterResource, application.getUser(),
userLimit, application, candidates.getPartition(),
currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false;
cul.reservation = appReserved;
}
}
if (!userAssignable) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
continue;
}
// Try to schedule
assignment = application.assignContainers(clusterResource,
candidates, currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application " + application
.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
+ " from " + application.getApplicationId());
return assignment;
} else{
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
@Override
public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
allocation.getAllocatedOrReservedContainer();
// Do not check limits when allocation from a reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
readLock.lock();
try {
FiCaSchedulerApp app =
schedulerContainer.getSchedulerApplicationAttempt();
String username = app.getUser();
String p = schedulerContainer.getNodePartition();
// check user-limit
Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
allocation.getSchedulingMode(), null);
// Deduct resources that we can release
User user = getUser(username);
if (user == null) {
LOG.debug("User {} has been removed!", username);
return false;
}
Resource usedResource = Resources.clone(user.getUsed(p));
Resources.subtractFrom(usedResource,
request.getTotalReleasedResource());
if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
userLimit)) {
LOG.debug("Used resource={} exceeded user-limit={}",
usedResource, userLimit);
return false;
}
} finally {
readLock.unlock();
}
}
return super.accept(cluster, request);
}
private void internalReleaseContainer(Resource clusterResource,
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
RMContainer rmContainer = schedulerContainer.getRmContainer();
AbstractLeafQueue targetLeafQueue =
schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
if (targetLeafQueue == this) {
// When trying to preempt containers from the same queue
if (rmContainer.getState() == RMContainerState.RESERVED) {
// For other reserved containers
// This is a reservation exchange, complete previous reserved container
completedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
.createAbnormalContainerStatus(rmContainer.getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, false);
}
} else{
// When trying to preempt containers from different queue -- this
// is for lazy preemption feature (kill preemption candidate in scheduling
// cycle).
targetLeafQueue.completedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getRmContainer(), SchedulerUtils
.createPreemptedContainerStatus(rmContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER),
RMContainerEventType.KILL, null, false);
}
}
private void releaseContainers(Resource clusterResource,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
.getContainersToRelease()) {
internalReleaseContainer(clusterResource, c);
}
// Handle container reservation looking, or lazy preemption case:
if (null != request.getContainersToAllocate() && !request
.getContainersToAllocate().isEmpty()) {
for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
.getContainersToAllocate()) {
if (null != context.getToRelease()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
.getToRelease()) {
internalReleaseContainer(clusterResource, c);
}
}
}
}
}
public void apply(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
// Do we need to call parent queue's apply?
boolean applyToParentQueue = false;
releaseContainers(cluster, request);
writeLock.lock();
try {
if (request.anythingAllocatedOrReserved()) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
// Do not modify queue when allocation from reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
// Only invoke apply() of ParentQueue when new allocation /
// reservation happen.
applyToParentQueue = true;
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(cluster,
schedulerContainer.getSchedulerApplicationAttempt(),
allocation.getAllocatedOrReservedResource(),
schedulerContainer.getNodePartition(),
schedulerContainer.getRmContainer());
orderingPolicy.containerAllocated(
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getRmContainer());
}
// Update reserved resource
if (Resources.greaterThan(resourceCalculator, cluster,
request.getTotalReservedResource(), Resources.none())) {
incReservedResource(schedulerContainer.getNodePartition(),
request.getTotalReservedResource());
}
}
} finally {
writeLock.unlock();
}
if (parent != null && applyToParentQueue) {
parent.apply(cluster, request);
}
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource, application,
RMNodeLabelsManager.NO_LABEL);
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
getResourceLimitForActiveUsers(application.getUser(), clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
}
private Resource getHeadroom(User user,
Resource currentPartitionResourceLimit, Resource clusterResource,
Resource userLimitResource, String partition) {
/**
* Headroom is:
* min(
* min(userLimit, queueMaxCap) - userConsumed,
* queueMaxCap - queueUsedResources
* )
*
* ( which can be expressed as,
* min (userLimit - userConsumed, queuMaxCap - userConsumed,
* queueMaxCap - queueUsedResources)
* )
*
* given that queueUsedResources >= userConsumed, this simplifies to
*
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
* sum of queue max capacities of multiple queue's will be greater than the
* actual capacity of a given partition, hence we need to ensure that the
* headroom is not greater than the available resource for a given partition
*
* headroom = min (unused resourcelimit of a label, calculated headroom )
*/
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
: getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
Resources.subtractNonNegative(userLimitResource,
user.getUsed(partition)),
Resources.subtractNonNegative(currentPartitionResourceLimit,
usageTracker.getQueueUsage().getUsed(partition)));
// Normalize it before return
headroom =
Resources.roundDown(resourceCalculator, headroom,
queueAllocationSettings.getMinimumAllocation());
//headroom = min (unused resourcelimit of a label, calculated headroom )
Resource clusterPartitionResource =
labelManager.getResourceByLabel(partition, clusterResource);
Resource clusterFreePartitionResource =
Resources.subtract(clusterPartitionResource,
queueContext.getClusterResourceUsage().getUsed(partition));
headroom = Resources.min(resourceCalculator, clusterPartitionResource,
clusterFreePartitionResource, headroom);
return headroom;
}
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
}
// It doesn't necessarily to hold application's lock here.
@Lock({AbstractLeafQueue.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode, Resource userLimit) {
String user = application.getUser();
User queueUser = getUser(user);
if (queueUser == null) {
LOG.debug("User {} has been removed!", user);
return Resources.none();
}
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
if (userLimit == null) {
userLimit = getResourceLimitForActiveUsers(application.getUser(),
clusterResource, nodePartition, schedulingMode);
}
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
+ userLimit + " queueMaxAvailRes="
+ cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+ queueUser.getUsed() + " partition="
+ nodePartition);
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
queueUser, this, application, queueResourceLimitsInfo);
application.setHeadroomProvider(headroomProvider);
usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom);
return userLimit;
}
@Lock(NoLock.class)
public int getNodeLocalityDelay() {
return nodeLocalityDelay;
}
@Lock(NoLock.class)
public int getRackLocalityAdditionalDelay() {
return rackLocalityAdditionalDelay;
}
@Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForActiveUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForActiveUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForAllUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
@Private
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
String nodePartition, ResourceLimits currentResourceLimits) {
readLock.lock();
try {
User user = getUser(userName);
if (user == null) {
LOG.debug("User {} has been removed!", userName);
return false;
}
currentResourceLimits.setAmountNeededUnreserve(Resources.none());
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getUsed(nodePartition), limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking) {
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
Resources.subtract(user.getUsed(),
application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueuePath()
+ " will exceed limit based on reservations - "
+ " consumed: " + user.getUsed() + " reserved: " + application
.getCurrentReservation() + " limit: " + limit);
}
Resource amountNeededToUnreserve = Resources.subtract(
user.getUsed(nodePartition), limit);
// we can only acquire a new container if we unreserve first to
// respect user-limit
currentResourceLimits.setAmountNeededUnreserve(
amountNeededToUnreserve);
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueuePath()
+ " will exceed limit - " + " consumed: " + user
.getUsed(nodePartition) + " limit: " + limit);
}
return false;
}
return true;
} finally {
readLock.unlock();
}
}
@Override
protected void setDynamicQueueProperties() {
// set to -1, to disable it
queueContext.getConfiguration().setUserLimitFactor(getQueuePath(), -1);
// Set Max AM percentage to a higher value
queueContext.getConfiguration().setMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath(), 1f);
super.setDynamicQueueProperties();
}
@Override
protected void setDynamicQueueACLProperties() {
super.setDynamicQueueACLProperties();
if (parent instanceof AbstractManagedParentQueue) {
acls.putAll(queueContext.getConfiguration().getACLsForLegacyAutoCreatedLeafQueue(
parent.getQueuePath()));
} else if (parent instanceof ParentQueue) {
acls.putAll(getACLsForFlexibleAutoCreatedLeafQueue(
((ParentQueue) parent).getAutoCreatedQueueTemplate()));
}
}
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = queueContext.getSchedulerHealth();
if (null == schedulerHealth) {
// Only do update if we have schedulerHealth
return;
}
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
rmContainer.getContainerId(), getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(queueContext.getLastNodeUpdateTime(),
rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
getQueuePath());
}
}
/**
* Recalculate QueueUsage Ratio.
*
* @param clusterResource
* Total Cluster Resource
* @param nodePartition
* Partition
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
writeLock.lock();
try {
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
getQueueCapacities().getExistingNodeLabels(),
queueResourceUsage.getExistingNodeLabels())) {
usersManager.updateUsageRatio(partition, clusterResource);
}
} else {
usersManager.updateUsageRatio(nodePartition, clusterResource);
}
} finally {
writeLock.unlock();
}
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) {
boolean removed = false;
// Careful! Locking order is important!
writeLock.lock();
try {
Container container = rmContainer.getContainer();
// Inform the application & the node
// Note: It's safe to assume that all state changes to RMContainer
// happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) {
removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
node, rmContainer);
} else{
removed = application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
node.releaseContainer(rmContainer.getContainerId(), false);
}
// Book-keeping
if (removed) {
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(),
node.getPartition(), rmContainer);
}
} finally {
writeLock.unlock();
}
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
parent.completedContainer(clusterResource, application, node,
rmContainer, null, event, this, sortQueues);
}
}
// Notify PreemptionManager
queueContext.getPreemptionManager().removeKillableContainer(
new KillableContainer(
rmContainer,
node.getPartition(),
getQueuePath()));
// Update preemption metrics if exit status is PREEMPTED
if (containerStatus != null
&& ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
updateQueuePreemptionMetrics(rmContainer);
}
}
void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer) {
writeLock.lock();
try {
super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
TreeSet<RMContainer> rmContainers = ignorePartitionExclusivityRMContainers.computeIfAbsent(
nodePartition, k -> new TreeSet<>());
rmContainers.add(rmContainer);
}
// Update user metrics
String userName = application.getUser();
// Increment user's resource usage.
User user = usersManager.updateUserResourceUsage(userName, resource,
queueContext.getClusterResource(), nodePartition, true);
Resource partitionHeadroom = Resources.createResource(0, 0);
if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(getQueuePath() + " user=" + userName + " used="
+ usageTracker.getQueueUsage().getUsed(nodePartition) + " numContainers="
+ usageTracker.getNumContainers() + " headroom = " + application.getHeadroom()
+ " user-resources=" + user.getUsed());
}
} finally {
writeLock.unlock();
}
}
void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer) {
writeLock.lock();
try {
super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
Set<RMContainer> rmContainers =
ignorePartitionExclusivityRMContainers.get(nodePartition);
rmContainers.remove(rmContainer);
if (rmContainers.isEmpty()) {
ignorePartitionExclusivityRMContainers.remove(nodePartition);
}
}
}
// Update user metrics
String userName = application.getUser();
User user = usersManager.updateUserResourceUsage(userName, resource,
queueContext.getClusterResource(), nodePartition, false);
Resource partitionHeadroom = Resources.createResource(0, 0);
if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(
getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers="
+ usageTracker.getNumContainers() + " user=" + userName + " user-resources="
+ user.getUsed());
}
} finally {
writeLock.unlock();
}
}
private void updateCurrentResourceLimits(
ResourceLimits currentResourceLimits, Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource = getEffectiveMaxCapacityDown(
RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));
}
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
writeLock.lock();
try {
lastClusterResource = clusterResource;
updateAbsoluteCapacities();
super.updateEffectiveResources(clusterResource);
// Update maximum applications for the queue and for users
updateMaximumApplications();
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);
// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();
// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
computeUserLimitAndSetHeadroom(application, clusterResource,
RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
}
} finally {
writeLock.unlock();
}
}
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
queueContext.getClusterResource(), nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
queueContext.getClusterResource(), nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
User user = getUser(application.getUser());
if (user == null) {
return;
}
user.getResourceUsage().incAMUsed(nodeLabel,
resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
}
public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
User user = getUser(application.getUser());
if (user == null) {
return;
}
user.getResourceUsage().decAMUsed(nodeLabel,
resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
// Careful! Locking order is important!
writeLock.lock();
try {
FiCaSchedulerNode node = queueContext.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt,
rmContainer.getContainer().getResource(), node.getPartition(),
rmContainer);
} finally {
writeLock.unlock();
}
parent.recoverContainer(clusterResource, attempt, rmContainer);
}
/**
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
return Collections.unmodifiableCollection(pendingOrderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of active applications.
*/
public Collection<FiCaSchedulerApp> getApplications() {
return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of all applications.
*/
public Collection<FiCaSchedulerApp> getAllApplications() {
Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
pendingOrderingPolicy.getSchedulableEntities());
apps.addAll(orderingPolicy.getSchedulableEntities());
return Collections.unmodifiableCollection(apps);
}
/**
* Get total pending resource considering user limit for the leaf queue. This
* will be used for calculating pending resources in the preemption monitor.
*
* Consider the headroom for each user in the queue.
* Total pending for the queue =
* sum(for each user(min((user's headroom), sum(user's pending requests))))
* NOTE:
* @param clusterResources clusterResource
* @param partition node partition
* @param deductReservedFromPending When a container is reserved in CS,
* pending resource will not be deducted.
* This could lead to double accounting when
* doing preemption:
* In normal cases, we should deduct reserved
* resource from pending to avoid
* excessive preemption.
* @return Total pending resource considering user limit
*/
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition,
boolean deductReservedFromPending) {
readLock.lock();
try {
Map<String, Resource> userNameToHeadroom =
new HashMap<>();
Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUsersManager().getUserAndAddIfAbsent(userName);
Resource headroom = Resources.subtract(
getResourceLimitForActiveUsers(app.getUser(), clusterResources,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
userNameToHeadroom.put(userName, headroom);
}
// Check if we need to deduct reserved from pending
Resource pending = app.getAppAttemptResourceUsage().getPending(
partition);
if (deductReservedFromPending) {
pending = Resources.subtract(pending,
app.getAppAttemptResourceUsage().getReserved(partition));
}
pending = Resources.componentwiseMax(pending, Resources.none());
Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
userNameToHeadroom.get(userName), pending);
Resources.addTo(totalPendingConsideringUserLimit,
minpendingConsideringUserLimit);
Resources.subtractFrom(userNameToHeadroom.get(userName),
minpendingConsideringUserLimit);
}
return totalPendingConsideringUserLimit;
} finally {
readLock.unlock();
}
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
readLock.lock();
try {
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) {
apps.add(app.getApplicationAttemptId());
}
} finally {
readLock.unlock();
}
}
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
queueContext.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " containerState="+ rmContainer.getState()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
parent.attachContainer(clusterResource, application, rmContainer);
}
}
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
queueContext.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " containerState="+ rmContainer.getState()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
parent.detachContainer(clusterResource, application, rmContainer);
}
}
/**
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
* this will be used by preemption policy.
*/
public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() {
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
readLock.lock();
try {
for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
.entrySet()) {
clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
}
return clonedMap;
} finally {
readLock.unlock();
}
}
public void setCapacity(float capacity) {
queueCapacities.setCapacity(capacity);
}
public void setCapacity(String nodeLabel, float capacity) {
queueCapacities.setCapacity(nodeLabel, capacity);
}
public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity);
}
public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
this.maxApplicationsPerUser = maxApplicationsPerUser;
}
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
public void setMaxAMResourcePerQueuePercent(
float maxAMResourcePerQueuePercent) {
this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
}
public OrderingPolicy<FiCaSchedulerApp> getOrderingPolicy() {
return orderingPolicy;
}
void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
writeLock.lock();
try {
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(
this.orderingPolicy.getSchedulableEntities());
}
this.orderingPolicy = orderingPolicy;
} finally {
writeLock.unlock();
}
}
@Override
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
writeLock.lock();
try {
FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
if (!isActive) {
pendingOrderingPolicy.removeSchedulableEntity(attempt);
}
// Update new priority in SchedulerApplication
attempt.setPriority(newAppPriority);
if (isActive) {
orderingPolicy.addSchedulableEntity(attempt);
} else {
pendingOrderingPolicy.addSchedulableEntity(attempt);
}
} finally {
writeLock.unlock();
}
}
public OrderingPolicy<FiCaSchedulerApp> getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
}
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
*/
static class QueueResourceLimitsInfo {
private Resource queueCurrentLimit;
private Resource clusterResource;
public void setQueueCurrentLimit(Resource currentLimit) {
this.queueCurrentLimit = currentLimit;
}
public Resource getQueueCurrentLimit() {
return queueCurrentLimit;
}
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
public Resource getClusterResource() {
return clusterResource;
}
}
@Override
public void stopQueue() {
writeLock.lock();
try {
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
} finally {
writeLock.unlock();
}
}
void updateMaximumApplications() {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
int maxAppsForQueue = configuration.getMaximumApplicationsPerQueue(getQueuePath());
int maxDefaultPerQueueApps = configuration.getGlobalMaximumApplicationsPerQueue();
int maxSystemApps = configuration.getMaximumSystemApplications();
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
Math.min(maxDefaultPerQueueApps, maxSystemApps)
: maxSystemApps;
String maxLabel = RMNodeLabelsManager.NO_LABEL;
if (maxAppsForQueue < 0) {
if (maxDefaultPerQueueApps > 0 && this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
maxAppsForQueue = baseMaxApplications;
} else {
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
int maxApplicationsByLabel = (int) (baseMaxApplications
* queueCapacities.getAbsoluteCapacity(label));
if (maxApplicationsByLabel > maxAppsForQueue) {
maxAppsForQueue = maxApplicationsByLabel;
maxLabel = label;
}
}
}
}
setMaxApplications(maxAppsForQueue);
updateMaxAppsPerUser();
LOG.info("LeafQueue:" + getQueuePath() +
"update max app related, maxApplications="
+ maxAppsForQueue + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}
private void updateMaxAppsPerUser() {
int maxAppsPerUser = maxApplications;
if (getUsersManager().getUserLimitFactor() != -1) {
int maxApplicationsWithUserLimits = (int) (maxApplications
* (getUsersManager().getUserLimit() / 100.0f)
* getUsersManager().getUserLimitFactor());
maxAppsPerUser = Math.min(maxApplications,
maxApplicationsWithUserLimits);
}
setMaxApplicationsPerUser(maxAppsPerUser);
}
/**
* Get all valid users in this queue.
* @return user list
*/
public Set<String> getAllUsers() {
return this.getUsersManager().getUsers().keySet();
}
static class CachedUserLimit {
final Resource userLimit;
volatile boolean canAssign = true;
volatile Resource reservation = Resources.none();
CachedUserLimit(Resource userLimit) {
this.userLimit = userLimit;
}
}
private void updateQueuePreemptionMetrics(RMContainer rmc) {
final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
CSQueueMetrics metrics = usageTracker.getMetrics();
Resource containerResource = rmc.getAllocatedResource();
metrics.preemptContainer();
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
/ DateUtils.MILLIS_PER_SECOND;
long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
/ DateUtils.MILLIS_PER_SECOND;
metrics.updatePreemptedMemoryMBSeconds(mbSeconds);
metrics.updatePreemptedVcoreSeconds(vcSeconds);
metrics.updatePreemptedResources(containerResource);
metrics.updatePreemptedSecondsForCustomResources(containerResource,
usedSeconds);
metrics.updatePreemptedForCustomResources(containerResource);
}
@Override
int getNumRunnableApps() {
readLock.lock();
try {
return runnableApps.size();
} finally {
readLock.unlock();
}
}
int getNumNonRunnableApps() {
readLock.lock();
try {
return nonRunnableApps.size();
} finally {
readLock.unlock();
}
}
boolean removeNonRunnableApp(FiCaSchedulerApp app) {
writeLock.lock();
try {
return nonRunnableApps.remove(app);
} finally {
writeLock.unlock();
}
}
List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
readLock.lock();
try {
appsToReturn.addAll(nonRunnableApps);
} finally {
readLock.unlock();
}
return appsToReturn;
}
@Override
public boolean isEligibleForAutoDeletion() {
return isDynamicQueue() && getNumApplications() == 0
&& queueContext.getConfiguration().
isAutoExpiredDeletionEnabled(this.getQueuePath());
}
}
相关信息
相关文章
hadoop AbstractAutoCreatedLeafQueue 源码
hadoop AbstractManagedParentQueue 源码
hadoop AppPriorityACLConfigurationParser 源码
hadoop AutoCreatedLeafQueue 源码
hadoop AutoCreatedLeafQueueConfig 源码
hadoop AutoCreatedQueueDeletionPolicy 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦