hadoop RollingLevelDB 源码

  • 2022-10-20
  • 浏览 (120)

haddop RollingLevelDB 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.yarn.server.timeline;

import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.Map.Entry;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * Contains the logic to lookup a leveldb by timestamp so that multiple smaller
 * databases can roll according to the configured period and evicted efficiently
 * via operating system directory removal.
class RollingLevelDB {

  /** Logger for this class. */
  private static final Logger LOG = LoggerFactory.
  /** Factory to open and create new leveldb instances. */
  private static JniDBFactory factory = new JniDBFactory();
  /** Thread safe date formatter. */
  private FastDateFormat fdf;
  /** Date parser. */
  private SimpleDateFormat sdf;
  /** Calendar to calculate the current and next rolling period. */
  private GregorianCalendar cal = new GregorianCalendar(
  /** Collection of all active rolling leveldb instances. */
  private final TreeMap<Long, DB> rollingdbs;
  /** Collection of all rolling leveldb instances to evict. */
  private final TreeMap<Long, DB> rollingdbsToEvict;
  /** Name of this rolling level db. */
  private final String name;
  /** Calculated timestamp of when to roll a new leveldb instance. */
  private volatile long nextRollingCheckMillis = 0;
  /** File system instance to find and create new leveldb instances. */
  private FileSystem lfs = null;
  /** Directory to store rolling leveldb instances. */
  private Path rollingDBPath;
  /** Configuration for this object. */
  private Configuration conf;
  /** Rolling period. */
  private RollingPeriod rollingPeriod;
   * Rolling leveldb instances are evicted when their endtime is earlier than
   * the current time minus the time to live value.
  private long ttl;
  /** Whether time to live is enabled. */
  private boolean ttlEnabled;

  /** Encapsulates the rolling period to date format lookup. */
  enum RollingPeriod {
    DAILY {
      public String dateFormat() {
        return "yyyy-MM-dd";
      public String dateFormat() {
        return "yyyy-MM-dd-HH";
      public String dateFormat() {
        return "yyyy-MM-dd-HH";
    HOURLY {
      public String dateFormat() {
        return "yyyy-MM-dd-HH";
      public String dateFormat() {
        return "yyyy-MM-dd-HH-mm";
    public abstract String dateFormat();

   * Convenience class for associating a write batch with its rolling leveldb
   * instance.
  public static class RollingWriteBatch {
    /** Leveldb object. */
    private final DB db;
    /** Write batch for the db object. */
    private final WriteBatch writeBatch;

    public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
      this.db = db;
      this.writeBatch = writeBatch;

    public DB getDB() {
      return db;

    public WriteBatch getWriteBatch() {
      return writeBatch;

    public void write() {

    public void close() {
      IOUtils.cleanupWithLogger(LOG, writeBatch);

  RollingLevelDB(String name) {
    this.name = name;
    this.rollingdbs = new TreeMap<Long, DB>();
    this.rollingdbsToEvict = new TreeMap<Long, DB>();

  protected String getName() {
    return name;

  protected long currentTimeMillis() {
    return System.currentTimeMillis();

  public long getNextRollingTimeMillis() {
    return nextRollingCheckMillis;

  public long getTimeToLive() {
    return ttl;

  public boolean getTimeToLiveEnabled() {
    return ttlEnabled;

  protected void setNextRollingTimeMillis(final long timestamp) {
    this.nextRollingCheckMillis = timestamp;
    LOG.info("Next rolling time for " + getName() + " is "
        + fdf.format(nextRollingCheckMillis));

  public void init(final Configuration config) throws Exception {
    LOG.info("Initializing RollingLevelDB for " + getName());
    this.conf = config;
    this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
    this.ttlEnabled = conf.getBoolean(
        YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
    this.rollingDBPath = new Path(

  protected void initFileSystem() throws IOException {
    lfs = FileSystem.getLocal(conf);
    boolean success = lfs.mkdirs(rollingDBPath,
    if (!success) {
      throw new IOException("Failed to create leveldb root directory "
          + rollingDBPath);

  protected synchronized void initRollingPeriod() {
    final String lcRollingPeriod = conf.get(
    this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
    fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
    sdf = new SimpleDateFormat(rollingPeriod.dateFormat());

  protected synchronized void initHistoricalDBs() throws IOException {
    Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
    FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
    for (FileStatus status : statuses) {
      String dbName = FilenameUtils.getExtension(status.getPath().toString());
      try {
        Long dbStartTime = sdf.parse(dbName).getTime();
        initRollingLevelDB(dbStartTime, status.getPath());
      } catch (ParseException pe) {
        LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
            + getName());

  private void initRollingLevelDB(Long dbStartTime,
      Path rollingInstanceDBPath) {
    if (rollingdbs.containsKey(dbStartTime)) {
    Options options = new Options();
    LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
        + " for start time: " + dbStartTime);
    DB db = null;
    try {
      db = factory.open(
          new File(rollingInstanceDBPath.toUri().getPath()), options);
      rollingdbs.put(dbStartTime, db);
      String dbName = fdf.format(dbStartTime);
      LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
    } catch (IOException ioe) {
      LOG.warn("Failed to open rolling leveldb instance :"
          + new File(rollingInstanceDBPath.toUri().getPath()), ioe);

  synchronized DB getPreviousDB(DB db) {
    Iterator<DB> iterator = rollingdbs.values().iterator();
    DB prev = null;
    while (iterator.hasNext()) {
      DB cur = iterator.next();
      if (cur == db) {
      prev = cur;
    return prev;

  synchronized long getStartTimeFor(DB db) {
    long startTime = -1;
    for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
      if (entry.getValue() == db) {
        startTime = entry.getKey();
    return startTime;

  public synchronized DB getDBForStartTime(long startTime) {
    // make sure we sanitize this input
    startTime = Math.min(startTime, currentTimeMillis());

    if (startTime >= getNextRollingTimeMillis()) {
    Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
    if (entry == null) {
      return null;
    return entry.getValue();

  private void roll(long startTime) {
    LOG.info("Rolling new DB instance for " + getName());
    long currentStartTime = computeCurrentCheckMillis(startTime);
    String currentRollingDBInstance = fdf.format(currentStartTime);
    String currentRollingDBName = getName() + "." + currentRollingDBInstance;
    Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
    if (getTimeToLiveEnabled()) {
    initRollingLevelDB(currentStartTime, currentRollingDBPath);

  private synchronized void scheduleOldDBsForEviction() {
    // keep at least time to live amount of data
    long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
        - getTimeToLive());

    LOG.info("Scheduling " + getName() + " DBs older than "
        + fdf.format(evictionThreshold) + " for eviction");
    Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
    while (iterator.hasNext()) {
      Entry<Long, DB> entry = iterator.next();
      // parse this in gmt time
      if (entry.getKey() < evictionThreshold) {
        LOG.info("Scheduling " + getName() + " eviction for "
            + fdf.format(entry.getKey()));
        rollingdbsToEvict.put(entry.getKey(), entry.getValue());

  public synchronized void evictOldDBs() {
    LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
    Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
    while (iterator.hasNext()) {
      Entry<Long, DB> entry = iterator.next();
      IOUtils.cleanupWithLogger(LOG, entry.getValue());
      String dbName = fdf.format(entry.getKey());
      Path path = new Path(rollingDBPath, getName() + "." + dbName);
      try {
        LOG.info("Removing old db directory contents in " + path);
        lfs.delete(path, true);
      } catch (IOException ioe) {
        LOG.warn("Failed to evict old db " + path, ioe);

  public void stop() throws Exception {
    for (DB db : rollingdbs.values()) {
      IOUtils.cleanupWithLogger(LOG, db);
    IOUtils.cleanupWithLogger(LOG, lfs);

  private long computeNextCheckMillis(long now) {
    return computeCheckMillis(now, true);

  public long computeCurrentCheckMillis(long now) {
    return computeCheckMillis(now, false);

  private synchronized long computeCheckMillis(long now, boolean next) {
    // needs to be called synchronously due to shared Calendar
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);

    if (rollingPeriod == RollingPeriod.DAILY) {
      cal.set(Calendar.HOUR_OF_DAY, 0);
      cal.set(Calendar.MINUTE, 0);
      if (next) {
        cal.add(Calendar.DATE, 1);
    } else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
      // round down to 12 hour interval
      int hour = (cal.get(Calendar.HOUR) / 12) * 12;
      cal.set(Calendar.HOUR, hour);
      cal.set(Calendar.MINUTE, 0);
      if (next) {
        cal.add(Calendar.HOUR_OF_DAY, 12);
    } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
      // round down to 6 hour interval
      int hour = (cal.get(Calendar.HOUR) / 6) * 6;
      cal.set(Calendar.HOUR, hour);
      cal.set(Calendar.MINUTE, 0);
      if (next) {
        cal.add(Calendar.HOUR_OF_DAY, 6);
    } else if (rollingPeriod == RollingPeriod.HOURLY) {
      cal.set(Calendar.MINUTE, 0);
      if (next) {
        cal.add(Calendar.HOUR_OF_DAY, 1);
    } else if (rollingPeriod == RollingPeriod.MINUTELY) {
      // round down to 5 minute interval
      int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
      cal.set(Calendar.MINUTE, minute);
      if (next) {
        cal.add(Calendar.MINUTE, 5);
    return cal.getTimeInMillis();


hadoop 源码目录


hadoop EntityIdentifier 源码

hadoop GenericObjectMapper 源码

hadoop KeyValueBasedTimelineStore 源码

hadoop LeveldbTimelineStore 源码

hadoop MemoryTimelineStore 源码

hadoop NameValuePair 源码

hadoop RollingLevelDBTimelineStore 源码

hadoop TimelineDataManager 源码

hadoop TimelineDataManagerMetrics 源码

hadoop TimelineReader 源码

0  赞