hadoop Log4jWarningErrorMetricsAppender 源码

  • 2022-10-20
  • 浏览 (183)

haddop Log4jWarningErrorMetricsAppender 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Log4jWarningErrorMetricsAppender.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.util;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;

import java.util.*;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Log4jWarningErrorMetricsAppender extends AppenderSkeleton {

  public static final String LOG_METRICS_APPENDER = "RM_LOG_METRICS_APPENDER";
  static final int MAX_MESSAGE_SIZE = 2048;

  static public class Element {
    public Long count;
    public Long timestampSeconds;

    Element(Long count, Long timestampSeconds) {
      this.count = count;
      this.timestampSeconds = timestampSeconds;
    }
  }

  static class PurgeElement implements Comparable<PurgeElement> {
    String message;
    Long timestamp;

    PurgeElement(String message, Long timestamp) {
      this.message = message;
      this.timestamp = timestamp;
    }

    public int compareTo(PurgeElement e) {
      if (e == null) {
        throw new NullPointerException("Null element passed to compareTo");
      }
      int ret = this.timestamp.compareTo(e.timestamp);
      if (ret != 0) {
        return ret;
      }
      return this.message.compareTo(e.message);
    }

    @Override
    public boolean equals(Object e) {
      if (!(e instanceof PurgeElement)) {
        return false;
      }
      if (e == this) {
        return true;
      }
      PurgeElement el = (PurgeElement) e;
      return (this.message.equals(el.message))
          && (this.timestamp.equals(el.timestamp));
    }

    @Override
    public int hashCode() {
      return this.timestamp.hashCode();
    }
  }

  Map<String, SortedMap<Long, Integer>> errors;
  Map<String, SortedMap<Long, Integer>> warnings;
  SortedMap<Long, Integer> errorsTimestampCount;
  SortedMap<Long, Integer> warningsTimestampCount;
  SortedSet<PurgeElement> errorsPurgeInformation;
  SortedSet<PurgeElement> warningsPurgeInformation;

  Timer cleanupTimer;
  long cleanupInterval;
  long messageAgeLimitSeconds;
  int maxUniqueMessages;

  final Object lock = new Object();

  /**
   * Create an appender to keep track of the errors and warnings logged by the
   * system. It will keep purge messages older than 2 days. It will store upto
   * the last 500 unique errors and the last 500 unique warnings. The thread to
   * purge message will run every 5 minutes, unless the 500 message limit is hit
   * earlier.
   */
  public Log4jWarningErrorMetricsAppender() {
    this(5 * 60, 24 * 60 * 60, 250);
  }

  /**
   * Create an appender to keep track of the errors and warnings logged by the
   * system.
   * 
   * @param cleanupIntervalSeconds
   *          the interval at which old messages are purged to prevent the
   *          message stores from growing unbounded
   * @param messageAgeLimitSeconds
   *          the maximum age of a message in seconds before it is purged from
   *          the store
   * @param maxUniqueMessages
   *          the maximum number of unique messages of each type we keep before
   *          we start purging
   */
  public Log4jWarningErrorMetricsAppender(int cleanupIntervalSeconds,
      long messageAgeLimitSeconds, int maxUniqueMessages) {
    super();
    errors = new HashMap<>();
    warnings = new HashMap<>();
    errorsTimestampCount = new TreeMap<>();
    warningsTimestampCount = new TreeMap<>();
    errorsPurgeInformation = new TreeSet<>();
    warningsPurgeInformation = new TreeSet<>();

    cleanupTimer = new Timer();
    cleanupInterval = cleanupIntervalSeconds * 1000;
    cleanupTimer.schedule(new ErrorAndWarningsCleanup(), cleanupInterval);
    this.messageAgeLimitSeconds = messageAgeLimitSeconds;
    this.maxUniqueMessages = maxUniqueMessages;
    this.setName(LOG_METRICS_APPENDER);
    this.setThreshold(Level.WARN);
  }

  /**
   * {@inheritDoc}
   */
  @Override
  protected void append(LoggingEvent event) {
    String message = event.getRenderedMessage();
    String[] throwableStr = event.getThrowableStrRep();
    if (throwableStr != null) {
      message = message + "\n" + StringUtils.join("\n", throwableStr);
      message =
          org.apache.commons.lang3.StringUtils.left(message, MAX_MESSAGE_SIZE);
    }
    int level = event.getLevel().toInt();

    if (level == Level.WARN_INT || level == Level.ERROR_INT) {
      // store second level information
      Long eventTimeSeconds = event.getTimeStamp() / 1000;
      Map<String, SortedMap<Long, Integer>> map;
      SortedMap<Long, Integer> timestampsCount;
      SortedSet<PurgeElement> purgeInformation;
      if (level == Level.WARN_INT) {
        map = warnings;
        timestampsCount = warningsTimestampCount;
        purgeInformation = warningsPurgeInformation;
      } else {
        map = errors;
        timestampsCount = errorsTimestampCount;
        purgeInformation = errorsPurgeInformation;
      }
      updateMessageDetails(message, eventTimeSeconds, map, timestampsCount,
        purgeInformation);
    }
  }

  private void updateMessageDetails(String message, Long eventTimeSeconds,
      Map<String, SortedMap<Long, Integer>> map,
      SortedMap<Long, Integer> timestampsCount,
      SortedSet<PurgeElement> purgeInformation) {
    synchronized (lock) {
      if (map.containsKey(message)) {
        SortedMap<Long, Integer> tmp = map.get(message);
        Long lastMessageTime = tmp.lastKey();
        int value = 1;
        if (tmp.containsKey(eventTimeSeconds)) {
          value = tmp.get(eventTimeSeconds) + 1;
        }
        tmp.put(eventTimeSeconds, value);
        purgeInformation.remove(new PurgeElement(message, lastMessageTime));
      } else {
        SortedMap<Long, Integer> value = new TreeMap<>();
        value.put(eventTimeSeconds, 1);
        map.put(message, value);
        if (map.size() > maxUniqueMessages * 2) {
          cleanupTimer.cancel();
          cleanupTimer = new Timer();
          cleanupTimer.schedule(new ErrorAndWarningsCleanup(), 0);
        }
      }
      purgeInformation.add(new PurgeElement(message, eventTimeSeconds));
      int newValue = 1;
      if (timestampsCount.containsKey(eventTimeSeconds)) {
        newValue = timestampsCount.get(eventTimeSeconds) + 1;
      }
      timestampsCount.put(eventTimeSeconds, newValue);
    }
  }

  /**
   * {@inheritDoc}
   */
  @Override
  public void close() {
    cleanupTimer.cancel();
  }

  /**
   * {@inheritDoc}
   */
  @Override
  public boolean requiresLayout() {
    return false;
  }

  /**
   * Get the counts of errors in the time periods provided. Note that the counts
   * provided by this function may differ from the ones provided by
   * getErrorMessagesAndCounts since the message store is purged at regular
   * intervals to prevent it from growing without bounds, while the store for
   * the counts is purged less frequently.
   * 
   * @param cutoffs
   *          list of timestamp cutoffs(in seconds) for which the counts are
   *          desired
   * @return list of error counts in the time periods corresponding to cutoffs
   */
  public List<Integer> getErrorCounts(List<Long> cutoffs) {
    return this.getCounts(errorsTimestampCount, cutoffs);
  }

  /**
   * Get the counts of warnings in the time periods provided. Note that the
   * counts provided by this function may differ from the ones provided by
   * getWarningMessagesAndCounts since the message store is purged at regular
   * intervals to prevent it from growing without bounds, while the store for
   * the counts is purged less frequently.
   * 
   * @param cutoffs
   *          list of timestamp cutoffs(in seconds) for which the counts are
   *          desired
   * @return list of warning counts in the time periods corresponding to cutoffs
   */
  public List<Integer> getWarningCounts(List<Long> cutoffs) {
    return this.getCounts(warningsTimestampCount, cutoffs);
  }

  private List<Integer> getCounts(SortedMap<Long, Integer> map,
      List<Long> cutoffs) {
    List<Integer> ret = new ArrayList<>();
    Long largestCutoff = Collections.min(cutoffs);
    for (int i = 0; i < cutoffs.size(); ++i) {
      ret.add(0);
    }
    synchronized (lock) {
      Map<Long, Integer> submap = map.tailMap(largestCutoff);
      for (Map.Entry<Long, Integer> entry : submap.entrySet()) {
        for (int i = 0; i < cutoffs.size(); ++i) {
          if (entry.getKey() >= cutoffs.get(i)) {
            int tmp = ret.get(i);
            ret.set(i, tmp + entry.getValue());
          }
        }
      }
    }
    return ret;
  }

  /**
   * Get the errors and the number of occurrences for each of the errors for the
   * time cutoffs provided. Note that the counts provided by this function may
   * differ from the ones provided by getErrorCounts since the message store is
   * purged at regular intervals to prevent it from growing without bounds,
   * while the store for the counts is purged less frequently.
   * 
   * @param cutoffs
   *          list of timestamp cutoffs(in seconds) for which the counts are
   *          desired
   * @return list of maps corresponding for each cutoff provided; each map
   *         contains the error and the number of times the error occurred in
   *         the time period
   */
  public List<Map<String, Element>>
      getErrorMessagesAndCounts(List<Long> cutoffs) {
    return this.getElementsAndCounts(errors, cutoffs, errorsPurgeInformation);
  }

  /**
   * Get the warning and the number of occurrences for each of the warnings for
   * the time cutoffs provided. Note that the counts provided by this function
   * may differ from the ones provided by getWarningCounts since the message
   * store is purged at regular intervals to prevent it from growing without
   * bounds, while the store for the counts is purged less frequently.
   * 
   * @param cutoffs
   *          list of timestamp cutoffs(in seconds) for which the counts are
   *          desired
   * @return list of maps corresponding for each cutoff provided; each map
   *         contains the warning and the number of times the error occurred in
   *         the time period
   */
  public List<Map<String, Element>> getWarningMessagesAndCounts(
      List<Long> cutoffs) {
    return this.getElementsAndCounts(warnings, cutoffs, warningsPurgeInformation);
  }

  private List<Map<String, Element>> getElementsAndCounts(
      Map<String, SortedMap<Long, Integer>> map, List<Long> cutoffs,
      SortedSet<PurgeElement> purgeInformation) {
    if (purgeInformation.size() > maxUniqueMessages) {
      ErrorAndWarningsCleanup cleanup = new ErrorAndWarningsCleanup();
      long cutoff = Time.now() - (messageAgeLimitSeconds * 1000);
      cutoff = (cutoff / 1000);
      cleanup.cleanupMessages(map, purgeInformation, cutoff, maxUniqueMessages);
    }
    List<Map<String, Element>> ret = new ArrayList<>(cutoffs.size());
    for (int i = 0; i < cutoffs.size(); ++i) {
      ret.add(new HashMap<String, Element>());
    }
    synchronized (lock) {
      for (Map.Entry<String, SortedMap<Long, Integer>> element : map.entrySet()) {
        for (int i = 0; i < cutoffs.size(); ++i) {
          Map<String, Element> retMap = ret.get(i);
          SortedMap<Long, Integer> qualifyingTimes =
              element.getValue().tailMap(cutoffs.get(i));
          long count = 0;
          for (Map.Entry<Long, Integer> entry : qualifyingTimes.entrySet()) {
            count += entry.getValue();
          }
          if (!qualifyingTimes.isEmpty()) {
            retMap.put(element.getKey(),
              new Element(count, qualifyingTimes.lastKey()));
          }
        }
      }
    }
    return ret;
  }

  // getters and setters for log4j
  public long getCleanupInterval() {
    return cleanupInterval;
  }

  public void setCleanupInterval(long cleanupInterval) {
    this.cleanupInterval = cleanupInterval;
  }

  public long getMessageAgeLimitSeconds() {
    return messageAgeLimitSeconds;
  }

  public void setMessageAgeLimitSeconds(long messageAgeLimitSeconds) {
    this.messageAgeLimitSeconds = messageAgeLimitSeconds;
  }

  public int getMaxUniqueMessages() {
    return maxUniqueMessages;
  }

  public void setMaxUniqueMessages(int maxUniqueMessages) {
    this.maxUniqueMessages = maxUniqueMessages;
  }

  class ErrorAndWarningsCleanup extends TimerTask {

    @Override
    public void run() {
      long cutoff = Time.now() - (messageAgeLimitSeconds * 1000);
      cutoff = (cutoff / 1000);
      cleanupMessages(errors, errorsPurgeInformation, cutoff, maxUniqueMessages);
      cleanupMessages(warnings, warningsPurgeInformation, cutoff,
        maxUniqueMessages);
      cleanupCounts(errorsTimestampCount, cutoff);
      cleanupCounts(warningsTimestampCount, cutoff);
      try {
        cleanupTimer.schedule(new ErrorAndWarningsCleanup(), cleanupInterval);
      } catch (IllegalStateException ie) {
        // don't do anything since new timer is already scheduled
      }
    }

    void cleanupMessages(Map<String, SortedMap<Long, Integer>> map,
            SortedSet<PurgeElement> purgeInformation, long cutoff,
            int mapTargetSize) {

      PurgeElement el = new PurgeElement("", cutoff);
      synchronized (lock) {
        SortedSet<PurgeElement> removeSet = purgeInformation.headSet(el);
        Iterator<PurgeElement> it = removeSet.iterator();
        while (it.hasNext()) {
          PurgeElement p = it.next();
          map.remove(p.message);
          it.remove();
        }

        // don't keep more mapTargetSize keys
        if (purgeInformation.size() > mapTargetSize) {
          Object[] array = purgeInformation.toArray();
          int cutoffIndex = purgeInformation.size() - mapTargetSize;
          for (int i = 0; i < cutoffIndex; ++i) {
            PurgeElement p = (PurgeElement) array[i];
            map.remove(p.message);
            purgeInformation.remove(p);
          }
        }
      }
    }

    void cleanupCounts(SortedMap<Long, Integer> map, long cutoff) {
      synchronized (lock) {
        Iterator<Map.Entry<Long, Integer>> it = map.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry<Long, Integer> element = it.next();
          if (element.getKey() < cutoff) {
            it.remove();
          }
        }
      }
    }
  }

  // helper function
  public static Log4jWarningErrorMetricsAppender findAppender() {
    Enumeration appenders = Logger.getRootLogger().getAllAppenders();
    while(appenders.hasMoreElements()) {
      Object obj = appenders.nextElement();
      if(obj instanceof Log4jWarningErrorMetricsAppender) {
        return (Log4jWarningErrorMetricsAppender) obj;
      }
    }
    return null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractLivelinessMonitor 源码

hadoop AdHocLogDumper 源码

hadoop ApplicationClassLoader 源码

hadoop Apps 源码

hadoop AsyncCallback 源码

hadoop AuxiliaryServiceHelper 源码

hadoop BoundedAppender 源码

hadoop Clock 源码

hadoop ConverterUtils 源码

hadoop DockerClientConfigHandler 源码

0  赞