hadoop TopMetrics 源码

  • 2022-10-20
  • 浏览 (230)

haddop TopMetrics 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.top.metrics;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;

/**
 * The interface to the top metrics.
 * <p>
 * Metrics are collected by a custom audit logger, {@link org.apache.hadoop
 * .hdfs.server.namenode.top.TopAuditLogger}, which calls TopMetrics to
 * increment per-operation, per-user counts on every audit log call. These
 * counts are used to show the top users by NameNode operation as well as
 * across all operations.
 * <p>
 * TopMetrics maintains these counts for a configurable number of time
 * intervals, e.g. 1min, 5min, 25min. Each interval is tracked by a
 * RollingWindowManager.
 * <p>
 * These metrics are published as a JSON string via {@link org.apache.hadoop
 * .hdfs.server .namenode.metrics.FSNamesystemMBean#getTopWindows}. This is
 * done by calling {@link org.apache.hadoop.hdfs.server.namenode.top.window
 * .RollingWindowManager#snapshot} on each RollingWindowManager.
 * <p>
 * Thread-safe: relies on thread-safety of RollingWindowManager
 */
@InterfaceAudience.Private
public class TopMetrics implements MetricsSource {
  public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
  public static final String TOPMETRICS_METRICS_SOURCE_NAME =
      "NNTopUserOpCounts";
  private final boolean isMetricsSourceEnabled;

  private static void logConf(Configuration conf) {
    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
        " = " +  conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_NUM_USERS_KEY +
        " = " +  conf.get(DFSConfigKeys.NNTOP_NUM_USERS_KEY));
    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY +
        " = " +  conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY));
  }

  /**
   * A map from reporting periods to WindowManager. Thread-safety is provided by
   * the fact that the mapping is not changed after construction.
   */
  final Map<Integer, RollingWindowManager> rollingWindowManagers =
      new HashMap<Integer, RollingWindowManager>();

  public TopMetrics(Configuration conf, int[] reportingPeriods) {
    logConf(conf);
    for (int i = 0; i < reportingPeriods.length; i++) {
      rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
          conf, reportingPeriods[i]));
    }
    isMetricsSourceEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
        DFSConfigKeys.NNTOP_ENABLED_DEFAULT);
  }

  /**
   * Get a list of the current TopWindow statistics, one TopWindow per tracked
   * time interval.
   */
  public List<TopWindow> getTopWindows() {
    long monoTime = Time.monotonicNow();
    List<TopWindow> windows = Lists.newArrayListWithCapacity
        (rollingWindowManagers.size());
    for (Entry<Integer, RollingWindowManager> entry : rollingWindowManagers
        .entrySet()) {
      TopWindow window = entry.getValue().snapshot(monoTime);
      windows.add(window);
    }
    return windows;
  }

  /**
   * Pick the same information that DefaultAuditLogger does before writing to a
   * log file. This is to be consistent when {@link TopMetrics} is charged with
   * data read back from log files instead of being invoked directly by the
   * FsNamesystem
   * @param succeeded
   * @param userName
   * @param addr
   * @param cmd
   * @param src
   * @param dst
   * @param status
   */
  public void report(boolean succeeded, String userName, InetAddress addr,
      String cmd, String src, String dst, FileStatus status) {
    // currently nntop only makes use of the username and the command
    report(userName, cmd);
  }

  public void report(String userName, String cmd) {
    long currTime = Time.monotonicNow();
    report(currTime, userName, cmd);
  }

  public void report(long currTime, String userName, String cmd) {
    LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
    userName = UserGroupInformation.trimLoginMethod(userName);
    for (RollingWindowManager rollingWindowManager : rollingWindowManagers
        .values()) {
      rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
    }
  }

  /**
   * Flatten out the top window metrics into
   * {@link org.apache.hadoop.metrics2.MetricsRecord}s for consumption by
   * external metrics systems. Each metrics record added corresponds to the
   * reporting period a.k.a window length of the configured rolling windows.
   * @param collector
   * @param all
   */
  @Override
  public void getMetrics(MetricsCollector collector, boolean all) {
    if (!isMetricsSourceEnabled) {
      return;
    }

    for (final TopWindow window : getTopWindows()) {
      MetricsRecordBuilder rb = collector.addRecord(buildOpRecordName(window))
          .setContext("dfs");
      for (final Op op: window.getOps()) {
        rb.addCounter(buildOpTotalCountMetricsInfo(op), op.getTotalCount());
        for (User user : op.getTopUsers()) {
          rb.addCounter(buildOpRecordMetricsInfo(op, user), user.getCount());
        }
      }
    }
  }

  private String buildOpRecordName(TopWindow window) {
    return TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs="
      + window.getWindowLenMs();
  }

  private MetricsInfo buildOpTotalCountMetricsInfo(Op op) {
    return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType())
      + ".TotalCount", "Total operation count");
  }

  private MetricsInfo buildOpRecordMetricsInfo(Op op, User user) {
    return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType())
      + ".user=" + user.getUser()
      + ".count", "Total operations performed by user");
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BufferPool 源码

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

0  赞