hadoop DefaultOOMHandler 源码

  • 2022-10-20
  • 浏览 (180)

haddop DefaultOOMHandler 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PROCS_FILE;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;

/**
 * A very basic OOM handler implementation.
 * See the javadoc on the run() method for details.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultOOMHandler implements Runnable {
  protected static final Logger LOG = LoggerFactory
      .getLogger(DefaultOOMHandler.class);
  private final Context context;
  private final String memoryStatFile;
  private final CGroupsHandler cgroups;

  /**
   * Create an OOM handler.
   * This has to be public to be able to construct through reflection.
   * @param context node manager context to work with
   * @param enforceVirtualMemory true if virtual memory needs to be checked,
   *                   false if physical memory needs to be checked instead
   */
  public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) {
    this.context = context;
    this.memoryStatFile = enforceVirtualMemory ?
        CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES :
        CGROUP_PARAM_MEMORY_USAGE_BYTES;
    this.cgroups = getCGroupsHandler();
  }

  @VisibleForTesting
  protected CGroupsHandler getCGroupsHandler() {
    return ResourceHandlerModule.getCGroupsHandler();
  }

  /**
   * Check if a given container exceeds its limits.
   */
  private boolean isContainerOutOfLimit(Container container) {
    boolean outOfLimit = false;

    String value = null;
    try {
      value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
          container.getContainerId().toString(), memoryStatFile);
      long usage = Long.parseLong(value);
      long request = container.getResource().getMemorySize() * 1024 * 1024;

      // Check if the container has exceeded its limits.
      if (usage > request) {
        outOfLimit = true;
        String message = String.format(
            "Container %s is out of its limits, using %d " +
                "when requested only %d",
            container.getContainerId(), usage, request);
        LOG.warn(message);
      }
    } catch (ResourceHandlerException ex) {
      LOG.warn(String.format("Could not access memory resource for %s",
          container.getContainerId()), ex);
    } catch (NumberFormatException ex) {
      LOG.warn(String.format("Could not parse %s in %s", value,
          container.getContainerId()));
    }
    return outOfLimit;
  }

  /**
   * SIGKILL the specified container. We do this not using the standard
   * container logic. The reason is that the processes are frozen by
   * the cgroups OOM handler, so they cannot respond to SIGTERM.
   * On the other hand we have to be as fast as possible.
   * We walk through the list of active processes in the container.
   * This is needed because frozen parents cannot signal their children.
   * We kill each process and then try again until the whole cgroup
   * is cleaned up. This logic avoids leaking processes in a cgroup.
   * Currently the killing only succeeds for PGIDS.
   *
   * @param container Container to clean up
   * @return true if the container is killed successfully, false otherwise
   */
  private boolean sigKill(Container container) {
    boolean containerKilled = false;
    boolean finished = false;
    try {
      while (!finished) {
        String[] pids =
            cgroups.getCGroupParam(
                CGroupsHandler.CGroupController.MEMORY,
                container.getContainerId().toString(),
                CGROUP_PROCS_FILE)
                .split("\n");
        finished = true;
        for (String pid : pids) {
          // Note: this kills only PGIDs currently
          if (pid != null && !pid.isEmpty()) {
            LOG.debug(String.format(
                "Terminating container %s Sending SIGKILL to -%s",
                container.getContainerId().toString(),
                pid));
            finished = false;
            try {
              context.getContainerExecutor().signalContainer(
                  new ContainerSignalContext.Builder().setContainer(container)
                      .setUser(container.getUser())
                      .setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
                      .build());
            } catch (IOException ex) {
              LOG.warn(String.format("Cannot kill container %s pid -%s.",
                  container.getContainerId(), pid), ex);
            }
          }
        }
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          LOG.debug("Interrupted while waiting for processes to disappear");
        }
      }
      containerKilled = true;
    } catch (ResourceHandlerException ex) {
      // the tasks file of the container may not be available because the
      // container may not have been launched at this point when the root
      // cgroup is under oom
      LOG.warn(String.format(
          "Cannot list more tasks in container %s to kill.",
          container.getContainerId()));
    }

    return containerKilled;
  }

  /**
   * It is called when the node is under an OOM condition. All processes in
   * all sub-cgroups are suspended. We need to act fast, so that we do not
   * affect the overall system utilization. In general we try to find a
   * newly launched container that exceeded its limits. The justification is
   * cost, since probably this is the one that has accumulated the least
   * amount of uncommitted data so far. OPPORTUNISTIC containers are always
   * killed before any GUARANTEED containers are considered.  We continue the
   * process until the OOM is resolved.
   */
  @Override
  public void run() {
    try {
      // We kill containers until the kernel reports the OOM situation resolved
      // Note: If the kernel has a delay this may kill more than necessary
      while (true) {
        String status = cgroups.getCGroupParam(
            CGroupsHandler.CGroupController.MEMORY,
            "",
            CGROUP_PARAM_MEMORY_OOM_CONTROL);
        if (!status.contains(CGroupsHandler.UNDER_OOM)) {
          break;
        }

        boolean containerKilled = killContainer();

        if (!containerKilled) {
          // This can happen, if SIGKILL did not clean up
          // non-PGID or containers or containers launched by other users
          // or if a process was put to the root YARN cgroup.
          throw new YarnRuntimeException(
              "Could not find any containers but CGroups " +
                  "reserved for containers ran out of memory. " +
                  "I am giving up");
        }
      }
    } catch (ResourceHandlerException ex) {
      LOG.warn("Could not fetch OOM status. " +
          "This is expected at shutdown. Exiting.", ex);
    }
  }

  /**
   * Choose and kill a container in case of OOM. We try to find the most
   * recently launched OPPORTUNISTIC container that exceeds its limit
   * and fall back to the most recently launched OPPORTUNISTIC container
   * If there is no such container found, we choose to kill a GUARANTEED
   * container in the same way.
   * @return true if a container is killed, false otherwise
   */
  protected boolean killContainer() {
    boolean containerKilled = false;

    ArrayList<ContainerCandidate> candidates = new ArrayList<>(0);
    for (Container container : context.getContainers().values()) {
      if (!container.isRunning()) {
        // skip containers that are not running yet because killing them
        // won't release any memory to get us out of OOM.
        continue;
        // note even if it is indicated that the container is running from
        // container.isRunning(), the container process might not have been
        // running yet. From NM's perspective, a container is running as
        // soon as the container launch is handed over the container executor
      }
      candidates.add(
          new ContainerCandidate(container, isContainerOutOfLimit(container)));
    }
    Collections.sort(candidates);
    if (candidates.isEmpty()) {
      LOG.warn(
          "Found no running containers to kill in order to release memory");
    }

    // make sure one container is killed successfully to release memory
    for(int i = 0; !containerKilled && i < candidates.size(); i++) {
      ContainerCandidate candidate = candidates.get(i);
      if (sigKill(candidate.container)) {
        String message = String.format(
            "container %s killed by elastic cgroups OOM handler.",
            candidate.container.getContainerId());
        LOG.warn(message);
        containerKilled = true;
      }
    }
    return containerKilled;
  }

  /**
   * Note: this class has a natural ordering that is inconsistent with equals.
   */
  private static class ContainerCandidate
      implements Comparable<ContainerCandidate> {
    private final boolean outOfLimit;
    final Container container;

    ContainerCandidate(Container container, boolean outOfLimit) {
      this.outOfLimit = outOfLimit;
      this.container = container;
    }

    /**
     * Order two containers by their execution type, followed by
     * their out-of-limit status and then launch time. Opportunistic
     * containers are ordered before Guaranteed containers. If two
     * containers are of the same execution type, the one that is
     * out of its limits is ordered before the one that isn't. If
     * two containers have the same execution type and out-of-limit
     * status, the one that's launched later is ordered before the
     * other one.
     */
    @Override
    public int compareTo(ContainerCandidate o) {
      boolean isThisOpportunistic = isOpportunistic(container);
      boolean isOtherOpportunistic = isOpportunistic(o.container);
      int ret = Boolean.compare(isOtherOpportunistic, isThisOpportunistic);
      if (ret == 0) {
        // the two containers are of the same execution type, order them
        // by their out-of-limit status.
        int outOfLimitRet = Boolean.compare(o.outOfLimit, outOfLimit);
        if (outOfLimitRet == 0) {
          // the two containers are also of the same out-of-limit status,
          // order them by their launch time
          ret = Long.compare(o.container.getContainerLaunchTime(),
              this.container.getContainerLaunchTime());
        } else {
          ret = outOfLimitRet;
        }
      }
      return ret;
    }

    @Override
    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (obj == null) {
        return false;
      }
      if (this.getClass() != obj.getClass()) {
        return false;
      }
      ContainerCandidate other = (ContainerCandidate) obj;
      if (this.outOfLimit != other.outOfLimit) {
        return false;
      }
      if (this.container == null) {
        return other.container == null;
      } else {
        return this.container.equals(other.container);
      }
    }

    @Override
    public int hashCode() {
      return new HashCodeBuilder().append(container).append(outOfLimit)
          .toHashCode();
    }

    /**
     * Check if a container is OPPORTUNISTIC or not. A container is
     * considered OPPORTUNISTIC only if its execution type is not
     * null and is OPPORTUNISTIC.
     */
    private static boolean isOpportunistic(Container container) {
      return container.getContainerTokenIdentifier() != null &&
          ExecutionType.OPPORTUNISTIC.equals(
              container.getContainerTokenIdentifier().getExecutionType());
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CGroupElasticMemoryController 源码

hadoop CGroupsBlkioResourceHandlerImpl 源码

hadoop CGroupsCpuResourceHandlerImpl 源码

hadoop CGroupsHandler 源码

hadoop CGroupsHandlerImpl 源码

hadoop CGroupsMemoryResourceHandlerImpl 源码

hadoop CGroupsMountConfig 源码

hadoop CGroupsResourceCalculator 源码

hadoop CombinedResourceCalculator 源码

hadoop CpuResourceHandler 源码

0  赞