hadoop CGroupsResourceCalculator 源码

  • 2022-10-20
  • 浏览 (194)

haddop CGroupsResourceCalculator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.CpuTimeTracker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.SysInfoLinux;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A cgroups file-system based Resource calculator without the process tree
 * features.
 *
 * CGroups has its limitations. It can only be enabled, if both CPU and memory
 * cgroups are enabled with yarn.nodemanager.resource.cpu.enabled and
 * yarn.nodemanager.resource.memory.enabled respectively. This means that
 * memory limits are enforced by default. You can turn this off and keep
 * memory reporting only with yarn.nodemanager.resource.memory.enforced.
 *
 * Another limitation is virtual memory measurement. CGroups does not have the
 * ability to measure virtual memory usage. This includes memory reserved but
 * not used. CGroups measures used memory as sa sum of
 * physical memory and swap usage. This will be returned in the virtual
 * memory counters.
 * If the real virtual memory is required please use the legacy procfs based
 * resource calculator or CombinedResourceCalculator.
 */
public class CGroupsResourceCalculator extends ResourceCalculatorProcessTree {
  enum Result {
    Continue,
    Exit
  }
  protected static final Logger LOG = LoggerFactory
      .getLogger(CGroupsResourceCalculator.class);
  private static final String PROCFS = "/proc";
  static final String CGROUP = "cgroup";
  static final String CPU_STAT = "cpuacct.stat";
  static final String MEM_STAT = "memory.usage_in_bytes";
  static final String MEMSW_STAT = "memory.memsw.usage_in_bytes";
  private static final String USER = "user ";
  private static final String SYSTEM = "system ";

  private static final Pattern CGROUP_FILE_FORMAT = Pattern.compile(
      "^(\\d+):([^:]+):/(.*)$");
  private final String procfsDir;
  private CGroupsHandler cGroupsHandler;

  private String pid;
  private File cpuStat;
  private File memStat;
  private File memswStat;

  private BigInteger processTotalJiffies;
  private long processPhysicalMemory;
  private long processVirtualMemory;

  private final long jiffyLengthMs;
  private final CpuTimeTracker cpuTimeTracker;
  private Clock clock;

  /**
   * Create resource calculator for all Yarn containers.
   */
  public CGroupsResourceCalculator()
      throws YarnException {
    this(null, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
        SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
  }

  /**
   * Create resource calculator for the container that has the specified pid.
   * @param pid A pid from the cgroup or null for all containers
   */
  public CGroupsResourceCalculator(String pid) {
    this(pid, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
        SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
  }

  /**
   * Create resource calculator for testing.
   * @param pid A pid from the cgroup or null for all containers
   * @param procfsDir Path to /proc or a mock /proc directory
   * @param cGroupsHandler Initialized cgroups handler object
   * @param clock A clock object
   * @param jiffyLengthMs0 Jiffy length in milliseconds
   */
  @VisibleForTesting
  CGroupsResourceCalculator(String pid, String procfsDir,
                            CGroupsHandler cGroupsHandler,
                            Clock clock,
                            long jiffyLengthMs0) {
    super(pid);
    this.procfsDir = procfsDir;
    this.cGroupsHandler = cGroupsHandler;
    this.pid = pid != null && pid.equals("0") ? "1" : pid;
    this.jiffyLengthMs = jiffyLengthMs0;
    this.cpuTimeTracker =
        new CpuTimeTracker(this.jiffyLengthMs);
    this.clock = clock;
    this.processTotalJiffies = BigInteger.ZERO;
    this.processPhysicalMemory = UNAVAILABLE;
    this.processVirtualMemory = UNAVAILABLE;
  }

  @Override
  public void initialize() throws YarnException {
    if (!CGroupsResourceCalculator.isAvailable()) {
      throw new YarnException("CGroupsResourceCalculator is not available");
    }
    setCGroupFilePaths();
  }

  @Override
  public float getCpuUsagePercent() {
    LOG.debug("Process {} jiffies:{}", pid, processTotalJiffies);
    return cpuTimeTracker.getCpuTrackerUsagePercent();
  }

  @Override
  public long getCumulativeCpuTime() {
    if (jiffyLengthMs < 0) {
      return UNAVAILABLE;
    }
    return processTotalJiffies.longValue() * jiffyLengthMs;
  }

  @Override
  public long getRssMemorySize(int olderThanAge) {
    if (olderThanAge > 1) {
      return UNAVAILABLE;
    }
    return processPhysicalMemory;
  }

  @Override
  public long getVirtualMemorySize(int olderThanAge) {
    if (olderThanAge > 1) {
      return UNAVAILABLE;
    }
    return processVirtualMemory;
  }

  @Override
  public void updateProcessTree() {
    try {
      this.processTotalJiffies = readTotalProcessJiffies();
      cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
          clock.getTime());
    } catch (YarnException e) {
      LOG.warn("Failed to parse " + pid, e);
    }
    processPhysicalMemory = getMemorySize(memStat);
    if (memswStat.exists()) {
      processVirtualMemory = getMemorySize(memswStat);
    } else {
      LOG.debug("Swap cgroups monitoring is not compiled into the kernel {}",
          memswStat.getAbsolutePath());
    }
  }

  @Override
  public String getProcessTreeDump() {
    // We do not have a process tree in cgroups return just the pid for tracking
    return pid;
  }

  @Override
  public boolean checkPidPgrpidForMatch() {
    // We do not have a process tree in cgroups returning default ok
    return true;
  }

  /**
   * Checks if the CGroupsResourceCalculator is available on this system.
   * This assumes that Linux container executor is already initialized.
   *
   * @return true if CGroupsResourceCalculator is available. False otherwise.
   */
  public static boolean isAvailable() {
    try {
      if (!Shell.LINUX) {
        LOG.info("CGroupsResourceCalculator currently is supported only on "
            + "Linux.");
        return false;
      }
      if (ResourceHandlerModule.getCGroupsHandler() == null ||
          ResourceHandlerModule.getCpuResourceHandler() == null ||
          ResourceHandlerModule.getMemoryResourceHandler() == null) {
        LOG.info("CGroupsResourceCalculator requires enabling CGroups" +
            "cpu and memory");
        return false;
      }
    } catch (SecurityException se) {
      LOG.warn("Failed to get Operating System name. " + se);
      return false;
    }
    return true;
  }

  private long getMemorySize(File cgroupUsageFile) {
    long[] mem = new long[1];
    try {
      processFile(cgroupUsageFile, (String line) -> {
        mem[0] = Long.parseLong(line);
        return Result.Exit;
      });
      return mem[0];
    } catch (YarnException e) {
      LOG.warn("Failed to parse cgroups " + memswStat, e);
    }
    return UNAVAILABLE;
  }

  private BigInteger readTotalProcessJiffies() throws YarnException {
    final BigInteger[] totalCPUTimeJiffies = new BigInteger[1];
    totalCPUTimeJiffies[0] = BigInteger.ZERO;
    processFile(cpuStat, (String line) -> {
      if (line.startsWith(USER)) {
        totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
            new BigInteger(line.substring(USER.length())));
      }
      if (line.startsWith(SYSTEM)) {
        totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
            new BigInteger(line.substring(SYSTEM.length())));
      }
      return Result.Continue;
    });
    return totalCPUTimeJiffies[0];
  }

  private String getCGroupRelativePath(
      CGroupsHandler.CGroupController controller)
      throws YarnException {
    if (pid == null) {
      return cGroupsHandler.getRelativePathForCGroup("");
    } else {
      return getCGroupRelativePathForPid(controller);
    }
  }

  private String getCGroupRelativePathForPid(
      CGroupsHandler.CGroupController controller)
      throws YarnException {
    File pidCgroupFile = new File(new File(procfsDir, pid), CGROUP);
    String[] result = new String[1];
    processFile(pidCgroupFile, (String line)->{
      Matcher m = CGROUP_FILE_FORMAT.matcher(line);
      boolean mat = m.find();
      if (mat) {
        if (m.group(2).contains(controller.getName())) {
          // Instead of returning the full path we compose it
          // based on the last item as the container id
          // This helps to avoid confusion within a privileged Docker container
          // where the path is referred in /proc/<pid>/cgroup as
          // /docker/<dcontainerid>/hadoop-yarn/<containerid>
          // but it is /hadoop-yarn/<containerid> in the cgroups hierarchy
          String cgroupPath = m.group(3);

          if (cgroupPath != null) {
            String cgroup =
                new File(cgroupPath).toPath().getFileName().toString();
            result[0] = cGroupsHandler.getRelativePathForCGroup(cgroup);
          } else {
            LOG.warn("Invalid cgroup path for " + pidCgroupFile);
          }
          return Result.Exit;
        }
      } else {
        LOG.warn(
            "Unexpected: cgroup file is not in the expected format"
                + " for process with pid " + pid);
      }
      return Result.Continue;
    });
    if (result[0] == null) {
      throw new YarnException(controller.getName() + " CGroup for pid " + pid +
          " not found " + pidCgroupFile);
    }
    return result[0];
  }

  private void processFile(File file, Function<String, Result> processLine)
      throws YarnException {
    // Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
    try (InputStreamReader fReader = new InputStreamReader(
        new FileInputStream(file), Charset.forName("UTF-8"))) {
      try (BufferedReader in = new BufferedReader(fReader)) {
        try {
          String str;
          while ((str = in.readLine()) != null) {
            Result result = processLine.apply(str);
            if (result == Result.Exit) {
              return;
            }
          }
        } catch (IOException io) {
          throw new YarnException("Error reading the stream " + io, io);
        }
      }
    } catch (IOException f) {
      throw new YarnException("The process vanished in the interim " + pid, f);
    }
  }

  void setCGroupFilePaths() throws YarnException {
    if (cGroupsHandler == null) {
      throw new YarnException("CGroups handler is not initialized");
    }
    File cpuDir = new File(
        cGroupsHandler.getControllerPath(
            CGroupsHandler.CGroupController.CPUACCT),
        getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT));
    File memDir = new File(
        cGroupsHandler.getControllerPath(
            CGroupsHandler.CGroupController.MEMORY),
        getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY));
    cpuStat = new File(cpuDir, CPU_STAT);
    memStat = new File(memDir, MEM_STAT);
    memswStat = new File(memDir, MEMSW_STAT);
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop CGroupElasticMemoryController 源码

hadoop CGroupsBlkioResourceHandlerImpl 源码

hadoop CGroupsCpuResourceHandlerImpl 源码

hadoop CGroupsHandler 源码

hadoop CGroupsHandlerImpl 源码

hadoop CGroupsMemoryResourceHandlerImpl 源码

hadoop CGroupsMountConfig 源码

hadoop CombinedResourceCalculator 源码

hadoop CpuResourceHandler 源码

hadoop DefaultOOMHandler 源码

0  赞