hadoop CGroupElasticMemoryController 源码
haddop CGroupElasticMemoryController 代码
文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT;
/**
* This thread controls memory usage using cgroups. It listens to out of memory
* events of all the containers together, and if we go over the limit picks
* a container to kill. The algorithm that picks the container is a plugin.
*/
public class CGroupElasticMemoryController extends Thread {
protected static final Logger LOG = LoggerFactory
.getLogger(CGroupElasticMemoryController.class);
private final Clock clock = new MonotonicClock();
private String yarnCGroupPath;
private String oomListenerPath;
private Runnable oomHandler;
private CGroupsHandler cgroups;
private boolean controlPhysicalMemory;
private boolean controlVirtualMemory;
private long limit;
private Process process = null;
private boolean stopped = false;
private int timeoutMS;
/**
* Default constructor.
* @param conf Yarn configuration to use
* @param context Node manager context to out of memory handler
* @param cgroups Cgroups handler configured
* @param controlPhysicalMemory Whether to listen to physical memory OOM
* @param controlVirtualMemory Whether to listen to virtual memory OOM
* @param limit memory limit in bytes
* @param oomHandlerOverride optional OOM handler
* @exception YarnException Could not instantiate class
*/
@VisibleForTesting
CGroupElasticMemoryController(Configuration conf,
Context context,
CGroupsHandler cgroups,
boolean controlPhysicalMemory,
boolean controlVirtualMemory,
long limit,
Runnable oomHandlerOverride)
throws YarnException {
super("CGroupElasticMemoryController");
boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory;
Runnable oomHandlerTemp =
getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual);
if (controlPhysicalMemory && controlVirtualMemory) {
LOG.warn(
NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
"We cannot control both virtual and physical " +
"memory at the same time. Enforcing virtual memory. " +
"If swapping is enabled set " +
"only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
"only " + NM_VMEM_CHECK_ENABLED + " to true.");
}
if (!controlPhysicalMemory && !controlVirtualMemory) {
throw new YarnException(
NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
"We need either virtual or physical memory check requested. " +
"If swapping is enabled set " +
"only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
"only " + NM_VMEM_CHECK_ENABLED + " to true.");
}
// We are safe at this point that no more exceptions can be thrown
this.timeoutMS =
1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC,
DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC);
this.oomListenerPath = getOOMListenerExecutablePath(conf);
this.oomHandler = oomHandlerTemp;
this.cgroups = cgroups;
this.controlPhysicalMemory = !controlVirtual;
this.controlVirtualMemory = controlVirtual;
this.yarnCGroupPath = this.cgroups
.getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, "");
this.limit = limit;
}
/**
* Get the configured OOM handler.
* @param conf configuration
* @param context context to pass to constructor
* @param oomHandlerLocal Default override
* @param controlVirtual Control physical or virtual memory
* @return The configured or overridden OOM handler.
* @throws YarnException in case the constructor failed
*/
private Runnable getDefaultOOMHandler(
Configuration conf, Context context, Runnable oomHandlerLocal,
boolean controlVirtual)
throws YarnException {
Class oomHandlerClass =
conf.getClass(
YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
DefaultOOMHandler.class);
if (oomHandlerLocal == null) {
try {
Constructor constr = oomHandlerClass.getConstructor(
Context.class, boolean.class);
oomHandlerLocal = (Runnable)constr.newInstance(
context, controlVirtual);
} catch (Exception ex) {
throw new YarnException(ex);
}
}
return oomHandlerLocal;
}
/**
* Default constructor.
* @param conf Yarn configuration to use
* @param context Node manager context to out of memory handler
* @param cgroups Cgroups handler configured
* @param controlPhysicalMemory Whether to listen to physical memory OOM
* @param controlVirtualMemory Whether to listen to virtual memory OOM
* @param limit memory limit in bytes
* @exception YarnException Could not instantiate class
*/
public CGroupElasticMemoryController(Configuration conf,
Context context,
CGroupsHandler cgroups,
boolean controlPhysicalMemory,
boolean controlVirtualMemory,
long limit)
throws YarnException {
this(conf,
context,
cgroups,
controlPhysicalMemory,
controlVirtualMemory,
limit,
null);
}
/**
* Exception thrown if the OOM situation is not resolved.
*/
static private class OOMNotResolvedException extends YarnRuntimeException {
OOMNotResolvedException(String message, Exception parent) {
super(message, parent);
}
}
/**
* Stop listening to the cgroup.
*/
public synchronized void stopListening() {
stopped = true;
if (process != null) {
process.destroyForcibly();
} else {
LOG.warn("Trying to stop listening, when listening is not running");
}
}
/**
* Checks if the CGroupElasticMemoryController is available on this system.
* This assumes that Linux container executor is already initialized.
* We need to have CGroups enabled.
*
* @return True if CGroupElasticMemoryController is available.
* False otherwise.
*/
public static boolean isAvailable() {
try {
if (!Shell.LINUX) {
LOG.info("CGroupElasticMemoryController currently is supported only "
+ "on Linux.");
return false;
}
if (ResourceHandlerModule.getCGroupsHandler() == null ||
ResourceHandlerModule.getMemoryResourceHandler() == null) {
LOG.info("CGroupElasticMemoryController requires enabling " +
"memory CGroups with" +
YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED);
return false;
}
} catch (SecurityException se) {
LOG.info("Failed to get Operating System name. " + se);
return false;
}
return true;
}
/**
* Main OOM listening thread. It uses an external process to listen to
* Linux events. The external process does not need to run as root, so
* it is not related to container-executor. We do not use JNI for security
* reasons.
*/
@Override
public void run() {
ExecutorService executor = null;
try {
// Disable OOM killer and set a limit.
// This has to be set first, so that we get notified about valid events.
// We will be notified about events even, if they happened before
// oom-listener started
setCGroupParameters();
// Start a listener process
ProcessBuilder oomListener = new ProcessBuilder();
oomListener.command(oomListenerPath, yarnCGroupPath);
synchronized (this) {
if (!stopped) {
process = oomListener.start();
} else {
resetCGroupParameters();
LOG.info("Listener stopped before starting");
return;
}
}
LOG.info(String.format("Listening on %s with %s",
yarnCGroupPath,
oomListenerPath));
// We need 1 thread for the error stream and a few others
// as a watchdog for the OOM killer
executor = Executors.newFixedThreadPool(2);
// Listen to any errors in the background. We do not expect this to
// be large in size, so it will fit into a string.
Future<String> errorListener =
executor.submit(() -> IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8));
// We get Linux event increments (8 bytes) forwarded from the event stream
// The events cannot be split, so it is safe to read them as a whole
// There is no race condition with the cgroup
// running out of memory. If oom is 1 at startup
// oom_listener will send an initial notification
InputStream events = process.getInputStream();
byte[] event = new byte[8];
int read;
// This loop can be exited by terminating the process
// with stopListening()
while ((read = events.read(event)) == event.length) {
// An OOM event has occurred
resolveOOM(executor);
}
if (read != -1) {
LOG.warn(String.format("Characters returned from event hander: %d",
read));
}
// If the input stream is closed, we wait for exit or process terminated.
int exitCode = process.waitFor();
String error = errorListener.get();
process = null;
LOG.info(String.format("OOM listener exited %d %s", exitCode, error));
} catch (OOMNotResolvedException ex) {
// We could mark the node unhealthy but it shuts down the node anyways.
// Let's just bring down the node manager all containers are frozen.
throw new YarnRuntimeException("Could not resolve OOM", ex);
} catch (Exception ex) {
synchronized (this) {
if (!stopped) {
LOG.warn("OOM Listener exiting.", ex);
}
}
} finally {
// Make sure we do not leak the child process,
// especially if process.waitFor() did not finish.
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
if (executor != null) {
try {
executor.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Exiting without processing all OOM events.");
}
executor.shutdown();
}
resetCGroupParameters();
}
}
/**
* Resolve an OOM event.
* Listen to the handler timeouts.
* @param executor Executor to create watchdog with.
* @throws InterruptedException interrupted
* @throws java.util.concurrent.ExecutionException cannot launch watchdog
*/
private void resolveOOM(ExecutorService executor)
throws InterruptedException, java.util.concurrent.ExecutionException {
// Just log, when we are still in OOM after a couple of seconds
final long start = clock.getTime();
Future<Boolean> watchdog =
executor.submit(() -> watchAndLogOOMState(start));
// Kill something to resolve the issue
try {
oomHandler.run();
} catch (RuntimeException ex) {
watchdog.cancel(true);
throw new OOMNotResolvedException("OOM handler failed", ex);
}
if (!watchdog.get()) {
// If we are still in OOM,
// the watchdog will trigger stop
// listening to exit this loop
throw new OOMNotResolvedException("OOM handler timed out", null);
}
}
/**
* Just watch until we are in OOM and log. Send an update log every second.
* @return if the OOM was resolved successfully
*/
private boolean watchAndLogOOMState(long start) {
long lastLog = start;
try {
long end = start;
// Throw an error, if we are still in OOM after 5 seconds
while(end - start < timeoutMS) {
end = clock.getTime();
String underOOM = cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL);
if (underOOM.contains(CGroupsHandler.UNDER_OOM)) {
if (end - lastLog > 1000) {
LOG.warn(String.format(
"OOM not resolved in %d ms", end - start));
lastLog = end;
}
} else {
LOG.info(String.format(
"Resolved OOM in %d ms", end - start));
return true;
}
// We do not want to saturate the CPU
// leaving the resources to the actual OOM killer
// but we want to be fast, too.
Thread.sleep(10);
}
} catch (InterruptedException ex) {
LOG.debug("Watchdog interrupted");
} catch (Exception e) {
LOG.warn("Exception running logging thread", e);
}
LOG.warn(String.format("OOM was not resolved in %d ms",
clock.getTime() - start));
stopListening();
return false;
}
/**
* Update root memory cgroup. This contains all containers.
* The physical limit has to be set first then the virtual limit.
*/
private void setCGroupParameters() throws ResourceHandlerException {
// Disable the OOM killer
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_OOM_CONTROL, "1");
if (controlPhysicalMemory && !controlVirtualMemory) {
try {
// Ignore virtual memory limits, since we do not know what it is set to
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
} catch (ResourceHandlerException ex) {
LOG.debug("Swap monitoring is turned off in the kernel");
}
// Set physical memory limits
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
} else if (controlVirtualMemory && !controlPhysicalMemory) {
// Ignore virtual memory limits, since we do not know what it is set to
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
// Set physical limits to no more than virtual limits
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
// Set virtual memory limits
// Important: it has to be set after physical limit is set
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit));
} else {
throw new ResourceHandlerException(
String.format("Unsupported scenario physical:%b virtual:%b",
controlPhysicalMemory, controlVirtualMemory));
}
}
/**
* Reset root memory cgroup to OS defaults. This controls all containers.
*/
private void resetCGroupParameters() {
try {
try {
// Disable memory limits
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
} catch (ResourceHandlerException ex) {
LOG.debug("Swap monitoring is turned off in the kernel");
}
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
// Enable the OOM killer
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_OOM_CONTROL, "0");
} catch (ResourceHandlerException ex) {
LOG.warn("Error in cleanup", ex);
}
}
private static String getOOMListenerExecutablePath(Configuration conf) {
String yarnHomeEnvVar =
System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
if (yarnHomeEnvVar == null) {
yarnHomeEnvVar = ".";
}
File hadoopBin = new File(yarnHomeEnvVar, "bin");
String defaultPath =
new File(hadoopBin, "oom-listener").getAbsolutePath();
final String path = conf.get(
YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
defaultPath);
LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath));
return path;
}
}
相关信息
相关文章
hadoop CGroupsBlkioResourceHandlerImpl 源码
hadoop CGroupsCpuResourceHandlerImpl 源码
hadoop CGroupsMemoryResourceHandlerImpl 源码
hadoop CGroupsResourceCalculator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦