hadoop DefaultOOMHandler 源码
haddop DefaultOOMHandler 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PROCS_FILE;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
/**
* A very basic OOM handler implementation.
* See the javadoc on the run() method for details.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultOOMHandler implements Runnable {
protected static final Logger LOG = LoggerFactory
.getLogger(DefaultOOMHandler.class);
private final Context context;
private final String memoryStatFile;
private final CGroupsHandler cgroups;
/**
* Create an OOM handler.
* This has to be public to be able to construct through reflection.
* @param context node manager context to work with
* @param enforceVirtualMemory true if virtual memory needs to be checked,
* false if physical memory needs to be checked instead
*/
public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) {
this.context = context;
this.memoryStatFile = enforceVirtualMemory ?
CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES :
CGROUP_PARAM_MEMORY_USAGE_BYTES;
this.cgroups = getCGroupsHandler();
}
@VisibleForTesting
protected CGroupsHandler getCGroupsHandler() {
return ResourceHandlerModule.getCGroupsHandler();
}
/**
* Check if a given container exceeds its limits.
*/
private boolean isContainerOutOfLimit(Container container) {
boolean outOfLimit = false;
String value = null;
try {
value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
container.getContainerId().toString(), memoryStatFile);
long usage = Long.parseLong(value);
long request = container.getResource().getMemorySize() * 1024 * 1024;
// Check if the container has exceeded its limits.
if (usage > request) {
outOfLimit = true;
String message = String.format(
"Container %s is out of its limits, using %d " +
"when requested only %d",
container.getContainerId(), usage, request);
LOG.warn(message);
}
} catch (ResourceHandlerException ex) {
LOG.warn(String.format("Could not access memory resource for %s",
container.getContainerId()), ex);
} catch (NumberFormatException ex) {
LOG.warn(String.format("Could not parse %s in %s", value,
container.getContainerId()));
}
return outOfLimit;
}
/**
* SIGKILL the specified container. We do this not using the standard
* container logic. The reason is that the processes are frozen by
* the cgroups OOM handler, so they cannot respond to SIGTERM.
* On the other hand we have to be as fast as possible.
* We walk through the list of active processes in the container.
* This is needed because frozen parents cannot signal their children.
* We kill each process and then try again until the whole cgroup
* is cleaned up. This logic avoids leaking processes in a cgroup.
* Currently the killing only succeeds for PGIDS.
*
* @param container Container to clean up
* @return true if the container is killed successfully, false otherwise
*/
private boolean sigKill(Container container) {
boolean containerKilled = false;
boolean finished = false;
try {
while (!finished) {
String[] pids =
cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
container.getContainerId().toString(),
CGROUP_PROCS_FILE)
.split("\n");
finished = true;
for (String pid : pids) {
// Note: this kills only PGIDs currently
if (pid != null && !pid.isEmpty()) {
LOG.debug(String.format(
"Terminating container %s Sending SIGKILL to -%s",
container.getContainerId().toString(),
pid));
finished = false;
try {
context.getContainerExecutor().signalContainer(
new ContainerSignalContext.Builder().setContainer(container)
.setUser(container.getUser())
.setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
.build());
} catch (IOException ex) {
LOG.warn(String.format("Cannot kill container %s pid -%s.",
container.getContainerId(), pid), ex);
}
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for processes to disappear");
}
}
containerKilled = true;
} catch (ResourceHandlerException ex) {
// the tasks file of the container may not be available because the
// container may not have been launched at this point when the root
// cgroup is under oom
LOG.warn(String.format(
"Cannot list more tasks in container %s to kill.",
container.getContainerId()));
}
return containerKilled;
}
/**
* It is called when the node is under an OOM condition. All processes in
* all sub-cgroups are suspended. We need to act fast, so that we do not
* affect the overall system utilization. In general we try to find a
* newly launched container that exceeded its limits. The justification is
* cost, since probably this is the one that has accumulated the least
* amount of uncommitted data so far. OPPORTUNISTIC containers are always
* killed before any GUARANTEED containers are considered. We continue the
* process until the OOM is resolved.
*/
@Override
public void run() {
try {
// We kill containers until the kernel reports the OOM situation resolved
// Note: If the kernel has a delay this may kill more than necessary
while (true) {
String status = cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL);
if (!status.contains(CGroupsHandler.UNDER_OOM)) {
break;
}
boolean containerKilled = killContainer();
if (!containerKilled) {
// This can happen, if SIGKILL did not clean up
// non-PGID or containers or containers launched by other users
// or if a process was put to the root YARN cgroup.
throw new YarnRuntimeException(
"Could not find any containers but CGroups " +
"reserved for containers ran out of memory. " +
"I am giving up");
}
}
} catch (ResourceHandlerException ex) {
LOG.warn("Could not fetch OOM status. " +
"This is expected at shutdown. Exiting.", ex);
}
}
/**
* Choose and kill a container in case of OOM. We try to find the most
* recently launched OPPORTUNISTIC container that exceeds its limit
* and fall back to the most recently launched OPPORTUNISTIC container
* If there is no such container found, we choose to kill a GUARANTEED
* container in the same way.
* @return true if a container is killed, false otherwise
*/
protected boolean killContainer() {
boolean containerKilled = false;
ArrayList<ContainerCandidate> candidates = new ArrayList<>(0);
for (Container container : context.getContainers().values()) {
if (!container.isRunning()) {
// skip containers that are not running yet because killing them
// won't release any memory to get us out of OOM.
continue;
// note even if it is indicated that the container is running from
// container.isRunning(), the container process might not have been
// running yet. From NM's perspective, a container is running as
// soon as the container launch is handed over the container executor
}
candidates.add(
new ContainerCandidate(container, isContainerOutOfLimit(container)));
}
Collections.sort(candidates);
if (candidates.isEmpty()) {
LOG.warn(
"Found no running containers to kill in order to release memory");
}
// make sure one container is killed successfully to release memory
for(int i = 0; !containerKilled && i < candidates.size(); i++) {
ContainerCandidate candidate = candidates.get(i);
if (sigKill(candidate.container)) {
String message = String.format(
"container %s killed by elastic cgroups OOM handler.",
candidate.container.getContainerId());
LOG.warn(message);
containerKilled = true;
}
}
return containerKilled;
}
/**
* Note: this class has a natural ordering that is inconsistent with equals.
*/
private static class ContainerCandidate
implements Comparable<ContainerCandidate> {
private final boolean outOfLimit;
final Container container;
ContainerCandidate(Container container, boolean outOfLimit) {
this.outOfLimit = outOfLimit;
this.container = container;
}
/**
* Order two containers by their execution type, followed by
* their out-of-limit status and then launch time. Opportunistic
* containers are ordered before Guaranteed containers. If two
* containers are of the same execution type, the one that is
* out of its limits is ordered before the one that isn't. If
* two containers have the same execution type and out-of-limit
* status, the one that's launched later is ordered before the
* other one.
*/
@Override
public int compareTo(ContainerCandidate o) {
boolean isThisOpportunistic = isOpportunistic(container);
boolean isOtherOpportunistic = isOpportunistic(o.container);
int ret = Boolean.compare(isOtherOpportunistic, isThisOpportunistic);
if (ret == 0) {
// the two containers are of the same execution type, order them
// by their out-of-limit status.
int outOfLimitRet = Boolean.compare(o.outOfLimit, outOfLimit);
if (outOfLimitRet == 0) {
// the two containers are also of the same out-of-limit status,
// order them by their launch time
ret = Long.compare(o.container.getContainerLaunchTime(),
this.container.getContainerLaunchTime());
} else {
ret = outOfLimitRet;
}
}
return ret;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (this.getClass() != obj.getClass()) {
return false;
}
ContainerCandidate other = (ContainerCandidate) obj;
if (this.outOfLimit != other.outOfLimit) {
return false;
}
if (this.container == null) {
return other.container == null;
} else {
return this.container.equals(other.container);
}
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(container).append(outOfLimit)
.toHashCode();
}
/**
* Check if a container is OPPORTUNISTIC or not. A container is
* considered OPPORTUNISTIC only if its execution type is not
* null and is OPPORTUNISTIC.
*/
private static boolean isOpportunistic(Container container) {
return container.getContainerTokenIdentifier() != null &&
ExecutionType.OPPORTUNISTIC.equals(
container.getContainerTokenIdentifier().getExecutionType());
}
}
}
相关信息
相关文章
hadoop CGroupElasticMemoryController 源码
hadoop CGroupsBlkioResourceHandlerImpl 源码
hadoop CGroupsCpuResourceHandlerImpl 源码
hadoop CGroupsMemoryResourceHandlerImpl 源码
hadoop CGroupsResourceCalculator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦