hadoop RollingLevelDBTimelineStore 源码

  • 2022-10-20
  • 浏览 (195)

haddop RollingLevelDBTimelineStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timeline;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;

import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;

import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTClazzNameRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.charset.StandardCharsets.UTF_8;

import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;

import static org.fusesource.leveldbjni.JniDBFactory.bytes;

/**
 * <p>
 * An implementation of an application timeline store backed by leveldb.
 * </p>
 *
 * <p>
 * There are three sections of the db, the start time section, the entity
 * section, and the indexed entity section.
 * </p>
 *
 * <p>
 * The start time section is used to retrieve the unique start time for a given
 * entity. Its values each contain a start time while its keys are of the form:
 * </p>
 *
 * <pre>
 *   START_TIME_LOOKUP_PREFIX + entity type + entity id
 * </pre>
 *
 * <p>
 * The entity section is ordered by entity type, then entity start time
 * descending, then entity ID. There are four sub-sections of the entity
 * section: events, primary filters, related entities, and other info. The event
 * entries have event info serialized into their values. The other info entries
 * have values corresponding to the values of the other info name/value map for
 * the entry (note the names are contained in the key). All other entries have
 * empty values. The key structure is as follows:
 * </p>
 *
 * <pre>
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     DOMAIN_ID_COLUMN
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     EVENTS_COLUMN + reveventtimestamp + eventtype
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     PRIMARY_FILTERS_COLUMN + name + value
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     OTHER_INFO_COLUMN + name
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
 * </pre>
 *
 * <p>
 * The indexed entity section contains a primary filter name and primary filter
 * value as the prefix. Within a given name/value, entire entity entries are
 * stored in the same format as described in the entity section above (below,
 * "key" represents any one of the possible entity entry keys described above).
 * </p>
 *
 * <pre>
 *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
 *     key
 * </pre>
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RollingLevelDBTimelineStore extends AbstractService implements
    TimelineStore {
  private static final Logger LOG = LoggerFactory
      .getLogger(RollingLevelDBTimelineStore.class);
  private static FSTConfiguration fstConf =
      FSTConfiguration.createDefaultConfiguration();
  // Fall back to 2.24 parsing if 2.50 parsing fails
  private static FSTConfiguration fstConf224 =
      FSTConfiguration.createDefaultConfiguration();
  // Static class code for 2.24
  private static final int LINKED_HASH_MAP_224_CODE = 83;

  static {
    fstConf.setShareReferences(false);
    fstConf224.setShareReferences(false);
    // YARN-6654 unable to find class for code 83 (LinkedHashMap)
    // The linked hash map was changed between 2.24 and 2.50 so that
    // the static code for LinkedHashMap (83) was changed to a dynamic
    // code.
    FSTClazzNameRegistry registry = fstConf224.getClassRegistry();
    registry.registerClass(
        LinkedHashMap.class, LINKED_HASH_MAP_224_CODE, fstConf224);
  }

  @Private
  @VisibleForTesting
  static final String FILENAME = "leveldb-timeline-store";
  static final String DOMAIN = "domain-ldb";
  static final String ENTITY = "entity-ldb";
  static final String INDEX = "indexes-ldb";
  static final String STARTTIME = "starttime-ldb";
  static final String OWNER = "owner-ldb";

  @VisibleForTesting
  //Extension to FILENAME where backup will be stored in case we need to
  //call LevelDb recovery
  static final String BACKUP_EXT = ".backup-";

  private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
  private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);

  private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
  private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
  private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
  private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
  private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);

  private static final byte[] EMPTY_BYTES = new byte[0];

  private static final String TIMELINE_STORE_VERSION_KEY =
      "timeline-store-version";

  private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);

  private static long writeBatchSize = 10000;

  @Private
  @VisibleForTesting
  static final FsPermission LEVELDB_DIR_UMASK = FsPermission
      .createImmutable((short) 0700);

  private Map<EntityIdentifier, Long> startTimeWriteCache;
  private Map<EntityIdentifier, Long> startTimeReadCache;

  private DB domaindb;
  private RollingLevelDB entitydb;
  private RollingLevelDB indexdb;
  private DB starttimedb;
  private DB ownerdb;

  private Thread deletionThread;

  public RollingLevelDBTimelineStore() {
    super(RollingLevelDBTimelineStore.class.getName());
  }

  private JniDBFactory factory;
  @VisibleForTesting
  void setFactory(JniDBFactory fact) {
    this.factory = fact;
  }

  @Override
  @SuppressWarnings("unchecked")
  protected void serviceInit(Configuration conf) throws Exception {
    Preconditions
        .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
            DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
            "%s property value should be greater than zero",
            TIMELINE_SERVICE_TTL_MS);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
        "%s property value should be greater than or equal to zero",
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
        " %s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);

    Options options = new Options();
    options.createIfMissing(true);
    options.cacheSize(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
    if(factory == null) {
      factory = new JniDBFactory();
    }
    Path dbPath = new Path(
        conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
    Path domainDBPath = new Path(dbPath, DOMAIN);
    Path starttimeDBPath = new Path(dbPath, STARTTIME);
    Path ownerDBPath = new Path(dbPath, OWNER);
    try (FileSystem localFS = FileSystem.getLocal(conf)) {
      if (!localFS.exists(dbPath)) {
        if (!localFS.mkdirs(dbPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + dbPath);
        }
        localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(domainDBPath)) {
        if (!localFS.mkdirs(domainDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + domainDBPath);
        }
        localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(starttimeDBPath)) {
        if (!localFS.mkdirs(starttimeDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + starttimeDBPath);
        }
        localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(ownerDBPath)) {
        if (!localFS.mkdirs(ownerDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + ownerDBPath);
        }
        localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
      }
    }
    options.maxOpenFiles(conf.getInt(
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
    options.writeBufferSize(conf.getInt(
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
    LOG.info("Using leveldb path " + dbPath);
    domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
    entitydb = new RollingLevelDB(ENTITY);
    entitydb.init(conf);
    indexdb = new RollingLevelDB(INDEX);
    indexdb.init(conf);
    starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
    ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
    checkVersion();
    startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
        getStartTimeWriteCacheSize(conf)));
    startTimeReadCache = Collections.synchronizedMap(new LRUMap(
        getStartTimeReadCacheSize(conf)));

    writeBatchSize = conf.getInt(
        TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
      deletionThread = new EntityDeletionThread(getConfig());
      deletionThread.start();
    }
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    if (deletionThread != null) {
      deletionThread.interrupt();
      LOG.info("Waiting for deletion thread to complete its current action");
      try {
        deletionThread.join();
      } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for deletion thread to complete,"
            + " closing db now", e);
      }
    }
    IOUtils.cleanupWithLogger(LOG, domaindb);
    IOUtils.cleanupWithLogger(LOG, starttimedb);
    IOUtils.cleanupWithLogger(LOG, ownerdb);
    entitydb.stop();
    indexdb.stop();
    super.serviceStop();
  }

  private class EntityDeletionThread extends Thread {
    private final long ttl;
    private final long ttlInterval;

    EntityDeletionThread(Configuration conf) {
      ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
          DEFAULT_TIMELINE_SERVICE_TTL_MS);
      ttlInterval = conf.getLong(
          TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
          DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
          + "interval " + ttlInterval);
    }

    @Override
    public void run() {
      Thread.currentThread().setName("Leveldb Timeline Store Retention");
      while (true) {
        long timestamp = System.currentTimeMillis() - ttl;
        try {
          discardOldEntities(timestamp);
          Thread.sleep(ttlInterval);
        } catch (IOException e) {
          LOG.error(e.toString());
        } catch (InterruptedException e) {
          LOG.info("Deletion thread received interrupt, exiting");
          break;
        }
      }
    }
  }

  @Override
  public TimelineEntity getEntity(String entityId, String entityType,
      EnumSet<Field> fields) throws IOException {
    Long revStartTime = getStartTimeLong(entityId, entityType);
    if (revStartTime == null) {
      LOG.debug("Could not find start time for {} {} ", entityType, entityId);
      return null;
    }
    byte[] prefix = KeyBuilder.newInstance().add(entityType)
        .add(writeReverseOrderedLong(revStartTime)).add(entityId)
        .getBytesForLookup();

    DB db = entitydb.getDBForStartTime(revStartTime);
    if (db == null) {
      LOG.debug("Could not find db for {} {} ", entityType, entityId);
      return null;
    }
    try (DBIterator iterator = db.iterator()) {
      iterator.seek(prefix);

      return getEntity(entityId, entityType, revStartTime, fields, iterator,
          prefix, prefix.length);
    }
  }

  /**
   * Read entity from a db iterator. If no information is found in the specified
   * fields for this entity, return null.
   */
  private static TimelineEntity getEntity(String entityId, String entityType,
      Long startTime, EnumSet<Field> fields, DBIterator iterator,
      byte[] prefix, int prefixlen) throws IOException {
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    TimelineEntity entity = new TimelineEntity();
    boolean events = false;
    boolean lastEvent = false;
    if (fields.contains(Field.EVENTS)) {
      events = true;
    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
      lastEvent = true;
    } else {
      entity.setEvents(null);
    }
    boolean relatedEntities = false;
    if (fields.contains(Field.RELATED_ENTITIES)) {
      relatedEntities = true;
    } else {
      entity.setRelatedEntities(null);
    }
    boolean primaryFilters = false;
    if (fields.contains(Field.PRIMARY_FILTERS)) {
      primaryFilters = true;
    } else {
      entity.setPrimaryFilters(null);
    }
    boolean otherInfo = false;
    if (fields.contains(Field.OTHER_INFO)) {
      otherInfo = true;
    } else {
      entity.setOtherInfo(null);
    }

    // iterate through the entity's entry, parsing information if it is part
    // of a requested field
    for (; iterator.hasNext(); iterator.next()) {
      byte[] key = iterator.peekNext().getKey();
      if (!prefixMatches(prefix, prefixlen, key)) {
        break;
      }
      if (key.length == prefixlen) {
        continue;
      }
      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
        if (primaryFilters) {
          addPrimaryFilter(entity, key, prefixlen
              + PRIMARY_FILTERS_COLUMN.length);
        }
      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
        if (otherInfo) {
          Object o = null;
          String keyStr = parseRemainingKey(key,
              prefixlen + OTHER_INFO_COLUMN.length);
          try {
            o = fstConf.asObject(iterator.peekNext().getValue());
            entity.addOtherInfo(keyStr, o);
          } catch (Exception ignore) {
            try {
              // Fall back to 2.24 parser
              o = fstConf224.asObject(iterator.peekNext().getValue());
              entity.addOtherInfo(keyStr, o);
            } catch (Exception e) {
              LOG.warn("Error while decoding "
                  + entityId + ":otherInfo:" + keyStr, e);
            }
          }
        }
      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
        if (relatedEntities) {
          addRelatedEntity(entity, key, prefixlen
              + RELATED_ENTITIES_COLUMN.length);
        }
      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
        if (events || (lastEvent && entity.getEvents().size() == 0)) {
          TimelineEvent event = getEntityEvent(null, key, prefixlen
              + EVENTS_COLUMN.length, iterator.peekNext().getValue());
          if (event != null) {
            entity.addEvent(event);
          }
        }
      } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
        byte[] v = iterator.peekNext().getValue();
        String domainId = new String(v, UTF_8);
        entity.setDomainId(domainId);
      } else {
        LOG.warn(String.format("Found unexpected column for entity %s of "
            + "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
      }
    }

    entity.setEntityId(entityId);
    entity.setEntityType(entityType);
    entity.setStartTime(startTime);

    return entity;
  }

  @Override
  public TimelineEvents getEntityTimelines(String entityType,
      SortedSet<String> entityIds, Long limit, Long windowStart,
      Long windowEnd, Set<String> eventType) throws IOException {
    TimelineEvents events = new TimelineEvents();
    if (entityIds == null || entityIds.isEmpty()) {
      return events;
    }
    // create a lexicographically-ordered map from start time to entities
    Map<byte[], List<EntityIdentifier>> startTimeMap =
        new TreeMap<byte[], List<EntityIdentifier>>(
        new Comparator<byte[]>() {
          @Override
          public int compare(byte[] o1, byte[] o2) {
            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
                o2.length);
          }
        });

      // look up start times for the specified entities
      // skip entities with no start time
    for (String entityId : entityIds) {
      byte[] startTime = getStartTime(entityId, entityType);
      if (startTime != null) {
        List<EntityIdentifier> entities = startTimeMap.get(startTime);
        if (entities == null) {
          entities = new ArrayList<EntityIdentifier>();
          startTimeMap.put(startTime, entities);
        }
        entities.add(new EntityIdentifier(entityId, entityType));
      }
    }
    for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
          .entrySet()) {
      // look up the events matching the given parameters (limit,
      // start time, end time, event types) for entities whose start times
      // were found and add the entities to the return list
      byte[] revStartTime = entry.getKey();
      for (EntityIdentifier entityIdentifier : entry.getValue()) {
        EventsOfOneEntity entity = new EventsOfOneEntity();
        entity.setEntityId(entityIdentifier.getId());
        entity.setEntityType(entityType);
        events.addEvent(entity);
        KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
            .add(revStartTime).add(entityIdentifier.getId())
            .add(EVENTS_COLUMN);
        byte[] prefix = kb.getBytesForLookup();
        if (windowEnd == null) {
          windowEnd = Long.MAX_VALUE;
        }
        byte[] revts = writeReverseOrderedLong(windowEnd);
        kb.add(revts);
        byte[] first = kb.getBytesForLookup();
        byte[] last = null;
        if (windowStart != null) {
          last = KeyBuilder.newInstance().add(prefix)
              .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
        }
        if (limit == null) {
          limit = DEFAULT_LIMIT;
        }
        DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
            revStartTime, 0));
        if (db == null) {
          continue;
        }
        try (DBIterator iterator = db.iterator()) {
          for (iterator.seek(first); entity.getEvents().size() < limit
              && iterator.hasNext(); iterator.next()) {
            byte[] key = iterator.peekNext().getKey();
            if (!prefixMatches(prefix, prefix.length, key)
                || (last != null && WritableComparator.compareBytes(key, 0,
                key.length, last, 0, last.length) > 0)) {
              break;
            }
            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
                iterator.peekNext().getValue());
            if (event != null) {
              entity.addEvent(event);
            }
          }
        }
      }
    }
    return events;
  }

  @Override
  public TimelineEntities getEntities(String entityType, Long limit,
      Long windowStart, Long windowEnd, String fromId, Long fromTs,
      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
    if (primaryFilter == null) {
      // if no primary filter is specified, prefix the lookup with
      // ENTITY_ENTRY_PREFIX
      return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
          windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
    } else {
      // if a primary filter is specified, prefix the lookup with
      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
      // ENTITY_ENTRY_PREFIX
      byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
          .add(fstConf.asByteArray(primaryFilter.getValue()), true)
          .getBytesForLookup();
      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
          fromId, fromTs, secondaryFilters, fields, checkAcl, true);
    }
  }

  /**
   * Retrieves a list of entities satisfying given parameters.
   *
   * @param base
   *          A byte array prefix for the lookup
   * @param entityType
   *          The type of the entity
   * @param limit
   *          A limit on the number of entities to return
   * @param starttime
   *          The earliest entity start time to retrieve (exclusive)
   * @param endtime
   *          The latest entity start time to retrieve (inclusive)
   * @param fromId
   *          Retrieve entities starting with this entity
   * @param fromTs
   *          Ignore entities with insert timestamp later than this ts
   * @param secondaryFilters
   *          Filter pairs that the entities should match
   * @param fields
   *          The set of fields to retrieve
   * @param usingPrimaryFilter
   *          true if this query is using a primary filter
   * @return A list of entities
   * @throws IOException
   */
  private TimelineEntities getEntityByTime(byte[] base, String entityType,
      Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
      CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
    KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
    // only db keys matching the prefix (base + entity type) will be parsed
    byte[] prefix = kb.getBytesForLookup();
    if (endtime == null) {
      // if end time is null, place no restriction on end time
      endtime = Long.MAX_VALUE;
    }

    // Sanitize the fields parameter
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    // construct a first key that will be seeked to using end time or fromId
    long firstStartTime = Long.MAX_VALUE;
    byte[] first = null;
    if (fromId != null) {
      Long fromIdStartTime = getStartTimeLong(fromId, entityType);
      if (fromIdStartTime == null) {
        // no start time for provided id, so return empty entities
        return new TimelineEntities();
      }
      if (fromIdStartTime <= endtime) {
        // if provided id's start time falls before the end of the window,
        // use it to construct the seek key
        firstStartTime = fromIdStartTime;
        first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
            .getBytesForLookup();
      }
    }
    // if seek key wasn't constructed using fromId, construct it using end ts
    if (first == null) {
      firstStartTime = endtime;
      first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
    }
    byte[] last = null;
    if (starttime != null) {
      // if start time is not null, set a last key that will not be
      // iterated past
      last = KeyBuilder.newInstance().add(base).add(entityType)
          .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
    }
    if (limit == null) {
      // if limit is not specified, use the default
      limit = DEFAULT_LIMIT;
    }

    TimelineEntities entities = new TimelineEntities();
    RollingLevelDB rollingdb = null;
    if (usingPrimaryFilter) {
      rollingdb = indexdb;
    } else {
      rollingdb = entitydb;
    }

    DB db = rollingdb.getDBForStartTime(firstStartTime);
    while (entities.getEntities().size() < limit && db != null) {
      try (DBIterator iterator = db.iterator()) {
        iterator.seek(first);

        // iterate until one of the following conditions is met: limit is
        // reached, there are no more keys, the key prefix no longer matches,
        // or a start time has been specified and reached/exceeded
        while (entities.getEntities().size() < limit && iterator.hasNext()) {
          byte[] key = iterator.peekNext().getKey();
          if (!prefixMatches(prefix, prefix.length, key)
              || (last != null && WritableComparator.compareBytes(key, 0,
              key.length, last, 0, last.length) > 0)) {
            break;
          }
          // read the start time and entity id from the current key
          KeyParser kp = new KeyParser(key, prefix.length);
          Long startTime = kp.getNextLong();
          String entityId = kp.getNextString();

          if (fromTs != null) {
            long insertTime = readReverseOrderedLong(iterator.peekNext()
                .getValue(), 0);
            if (insertTime > fromTs) {
              byte[] firstKey = key;
              while (iterator.hasNext()) {
                key = iterator.peekNext().getKey();
                iterator.next();
                if (!prefixMatches(firstKey, kp.getOffset(), key)) {
                  break;
                }
              }
              continue;
            }
          }
          // Even if other info and primary filter fields are not included, we
          // still need to load them to match secondary filters when they are
          // non-empty
          EnumSet<Field> queryFields = EnumSet.copyOf(fields);
          boolean addPrimaryFilters = false;
          boolean addOtherInfo = false;
          if (secondaryFilters != null && secondaryFilters.size() > 0) {
            if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
              queryFields.add(Field.PRIMARY_FILTERS);
              addPrimaryFilters = true;
            }
            if (!queryFields.contains(Field.OTHER_INFO)) {
              queryFields.add(Field.OTHER_INFO);
              addOtherInfo = true;
            }
          }

          // parse the entity that owns this key, iterating over all keys for
          // the entity
          TimelineEntity entity = null;
          if (usingPrimaryFilter) {
            entity = getEntity(entityId, entityType, queryFields);
            iterator.next();
          } else {
            entity = getEntity(entityId, entityType, startTime, queryFields,
                iterator, key, kp.getOffset());
          }

          if (entity != null) {
            // determine if the retrieved entity matches the provided secondary
            // filters, and if so add it to the list of entities to return
            boolean filterPassed = true;
            if (secondaryFilters != null) {
              for (NameValuePair filter : secondaryFilters) {
                Object v = entity.getOtherInfo().get(filter.getName());
                if (v == null) {
                  Set<Object> vs = entity.getPrimaryFilters()
                          .get(filter.getName());
                  if (vs == null || !vs.contains(filter.getValue())) {
                    filterPassed = false;
                    break;
                  }
                } else if (!v.equals(filter.getValue())) {
                  filterPassed = false;
                  break;
                }
              }
            }
            if (filterPassed) {
              if (entity.getDomainId() == null) {
                entity.setDomainId(DEFAULT_DOMAIN_ID);
              }
              if (checkAcl == null || checkAcl.check(entity)) {
                // Remove primary filter and other info if they are added for
                // matching secondary filters
                if (addPrimaryFilters) {
                  entity.setPrimaryFilters(null);
                }
                if (addOtherInfo) {
                  entity.setOtherInfo(null);
                }
                entities.addEntity(entity);
              }
            }
          }
        }
        db = rollingdb.getPreviousDB(db);
      }
    }
    return entities;
  }

  /**
   * Put a single entity. If there is an error, add a TimelinePutError to the
   * given response.
   *
   * @param entityUpdates
   *          a map containing all the scheduled writes for this put to the
   *          entity db
   * @param indexUpdates
   *          a map containing all the scheduled writes for this put to the
   *          index db
   */
  private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
      TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
      TimelinePutResponse response) {

    long putCount = 0;
    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
        new ArrayList<EntityIdentifier>();
    byte[] revStartTime = null;
    Map<String, Set<Object>> primaryFilters = null;
    try {
      List<TimelineEvent> events = entity.getEvents();
      // look up the start time for the entity
      Long startTime = getAndSetStartTime(entity.getEntityId(),
          entity.getEntityType(), entity.getStartTime(), events);
      if (startTime == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_START_TIME);
        response.addError(error);
        return putCount;
      }

      // Must have a domain
      if (StringUtils.isEmpty(entity.getDomainId())) {
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_DOMAIN);
        response.addError(error);
        return putCount;
      }

      revStartTime = writeReverseOrderedLong(startTime);
      long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
      RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
      if (rollingWriteBatch == null) {
        DB db = entitydb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch writeBatch = db.createWriteBatch();
          rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
          entityUpdates.put(roundedStartTime, rollingWriteBatch);
        }
      }
      if (rollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();

      // Save off the getBytes conversion to avoid unnecessary cost
      byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
      byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
      byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);

      // write entity marker
      byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
      writeBatch.put(markerKey, EMPTY_BYTES);
      ++putCount;

      // write domain id entry
      byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
          .getBytes();
      writeBatch.put(domainkey, domainIdBytes);
      ++putCount;

      // write event entries
      if (events != null) {
        for (TimelineEvent event : events) {
          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
          byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
              .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
          byte[] value = fstConf.asByteArray(event.getEventInfo());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write primary filter entries
      primaryFilters = entity.getPrimaryFilters();
      if (primaryFilters != null) {
        for (Entry<String, Set<Object>> primaryFilter : primaryFilters
            .entrySet()) {
          for (Object primaryFilterValue : primaryFilter.getValue()) {
            byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
                .add(revStartTime).add(entityIdBytes, true)
                .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
                .add(fstConf.asByteArray(primaryFilterValue)).getBytes();
            writeBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write other info entries
      Map<String, Object> otherInfo = entity.getOtherInfo();
      if (otherInfo != null) {
        for (Entry<String, Object> info : otherInfo.entrySet()) {
          byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true)
              .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
          byte[] value = fstConf.asByteArray(info.getValue());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write related entity entries
      Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
      if (relatedEntities != null) {
        for (Entry<String, Set<String>> relatedEntityList : relatedEntities
            .entrySet()) {
          String relatedEntityType = relatedEntityList.getKey();
          for (String relatedEntityId : relatedEntityList.getValue()) {
            // look up start time of related entity
            Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
                relatedEntityType);
            // delay writing the related entity if no start time is found
            if (relatedStartTimeLong == null) {
              relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
                  relatedEntityId, relatedEntityType));
              continue;
            }

            byte[] relatedEntityStartTime =
                writeReverseOrderedLong(relatedStartTimeLong);
            long relatedRoundedStartTime = entitydb
                .computeCurrentCheckMillis(relatedStartTimeLong);
            RollingWriteBatch relatedRollingWriteBatch = entityUpdates
                .get(relatedRoundedStartTime);
            if (relatedRollingWriteBatch == null) {
              DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
              if (db != null) {
                WriteBatch relatedWriteBatch = db.createWriteBatch();
                relatedRollingWriteBatch = new RollingWriteBatch(db,
                    relatedWriteBatch);
                entityUpdates.put(relatedRoundedStartTime,
                    relatedRollingWriteBatch);
              }
            }
            if (relatedRollingWriteBatch == null) {
              // if no start time is found, add an error and return
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
              response.addError(error);
              continue;
            }
            // This is the existing entity
            byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
                createDomainIdKey(relatedEntityId, relatedEntityType,
                    relatedEntityStartTime));
            // The timeline data created by the server before 2.6 won't have
            // the domain field. We assume this timeline data is in the
            // default timeline domain.
            String domainId = null;
            if (relatedDomainIdBytes == null) {
              domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
            } else {
              domainId = new String(relatedDomainIdBytes, UTF_8);
            }
            if (!domainId.equals(entity.getDomainId())) {
              // in this case the entity will be put, but the relation will be
              // ignored
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
              response.addError(error);
              continue;
            }
            // write "forward" entry (related entity -> entity)
            byte[] key = createRelatedEntityKey(relatedEntityId,
                relatedEntityType, relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType());
            WriteBatch relatedWriteBatch = relatedRollingWriteBatch
                .getWriteBatch();
            relatedWriteBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write index entities
      RollingWriteBatch indexRollingWriteBatch = indexUpdates
          .get(roundedStartTime);
      if (indexRollingWriteBatch == null) {
        DB db = indexdb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch indexWriteBatch = db.createWriteBatch();
          indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
          indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
        }
      }
      if (indexRollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
      putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
          markerKey, EMPTY_BYTES);
    } catch (IOException e) {
      LOG.error("Error putting entity " + entity.getEntityId() + " of type "
          + entity.getEntityType(), e);
      TimelinePutError error = new TimelinePutError();
      error.setEntityId(entity.getEntityId());
      error.setEntityType(entity.getEntityType());
      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
      response.addError(error);
    }

    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
      try {
        Long relatedEntityStartAndInsertTime = getAndSetStartTime(
            relatedEntity.getId(), relatedEntity.getType(),
            readReverseOrderedLong(revStartTime, 0), null);
        if (relatedEntityStartAndInsertTime == null) {
          throw new IOException("Error setting start time for related entity");
        }
        long relatedStartTimeLong = relatedEntityStartAndInsertTime;
        long relatedRoundedStartTime = entitydb
            .computeCurrentCheckMillis(relatedStartTimeLong);
        RollingWriteBatch relatedRollingWriteBatch = entityUpdates
            .get(relatedRoundedStartTime);
        if (relatedRollingWriteBatch == null) {
          DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
          if (db != null) {
            WriteBatch relatedWriteBatch = db.createWriteBatch();
            relatedRollingWriteBatch = new RollingWriteBatch(db,
                relatedWriteBatch);
            entityUpdates
                .put(relatedRoundedStartTime, relatedRollingWriteBatch);
          }
        }
        if (relatedRollingWriteBatch == null) {
          // if no start time is found, add an error and return
          TimelinePutError error = new TimelinePutError();
          error.setEntityId(entity.getEntityId());
          error.setEntityType(entity.getEntityType());
          error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
          response.addError(error);
          continue;
        }
        WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
        byte[] relatedEntityStartTime =
            writeReverseOrderedLong(relatedEntityStartAndInsertTime);
        // This is the new entity, the domain should be the same
        byte[] key = createDomainIdKey(relatedEntity.getId(),
            relatedEntity.getType(), relatedEntityStartTime);
        relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
        ++putCount;
        relatedWriteBatch.put(
            createRelatedEntityKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
        ++putCount;
        relatedWriteBatch.put(
            createEntityMarkerKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
        ++putCount;
      } catch (IOException e) {
        LOG.error(
            "Error putting related entity " + relatedEntity.getId()
                + " of type " + relatedEntity.getType() + " for entity "
                + entity.getEntityId() + " of type " + entity.getEntityType(),
            e);
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
        response.addError(error);
      }
    }

    return putCount;
  }

  /**
   * For a given key / value pair that has been written to the db, write
   * additional entries to the db for each primary filter.
   */
  private static long writePrimaryFilterEntries(WriteBatch writeBatch,
      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
      throws IOException {
    long putCount = 0;
    if (primaryFilters != null) {
      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
        for (Object pfval : pf.getValue()) {
          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value);
          ++putCount;
        }
      }
    }
    return putCount;
  }

  @Override
  public TimelinePutResponse put(TimelineEntities entities) {
    LOG.debug("Starting put");
    TimelinePutResponse response = new TimelinePutResponse();
    TreeMap<Long, RollingWriteBatch> entityUpdates =
        new TreeMap<Long, RollingWriteBatch>();
    TreeMap<Long, RollingWriteBatch> indexUpdates =
        new TreeMap<Long, RollingWriteBatch>();

    long entityCount = 0;
    long indexCount = 0;

    try {

      for (TimelineEntity entity : entities.getEntities()) {
        entityCount += putEntities(entityUpdates, indexUpdates, entity,
            response);
      }

      for (RollingWriteBatch entityUpdate : entityUpdates.values()) {
        entityUpdate.write();
      }

      for (RollingWriteBatch indexUpdate : indexUpdates.values()) {
        indexUpdate.write();
      }

    } finally {

      for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) {
        entityRollingWriteBatch.close();
      }
      for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) {
        indexRollingWriteBatch.close();
      }
    }
    LOG.debug("Put {} new leveldb entity entries and {} new leveldb index"
        + " entries from {} timeline entities", entityCount, indexCount,
        entities.getEntities().size());
    return response;
  }

  /**
   * Get the unique start time for a given entity as a byte array that sorts the
   * timestamps in reverse order (see
   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}).
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @return A byte array, null if not found
   * @throws IOException
   */
  private byte[] getStartTime(String entityId, String entityType)
      throws IOException {
    Long l = getStartTimeLong(entityId, entityType);
    return l == null ? null : writeReverseOrderedLong(l);
  }

  /**
   * Get the unique start time for a given entity as a Long.
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @return A Long, null if not found
   * @throws IOException
   */
  private Long getStartTimeLong(String entityId, String entityType)
      throws IOException {
    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
    // start time is not provided, so try to look it up
    if (startTimeReadCache.containsKey(entity)) {
      // found the start time in the cache
      return startTimeReadCache.get(entity);
    } else {
      // try to look up the start time in the db
      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
      byte[] v = starttimedb.get(b);
      if (v == null) {
        // did not find the start time in the db
        return null;
      } else {
        // found the start time in the db
        Long l = readReverseOrderedLong(v, 0);
        startTimeReadCache.put(entity, l);
        return l;
      }
    }
  }

  /**
   * Get the unique start time for a given entity as a byte array that sorts the
   * timestamps in reverse order (see
   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
   * time doesn't exist, set it based on the information provided.
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @param startTime
   *          The start time of the entity, or null
   * @param events
   *          A list of events for the entity, or null
   * @return A StartAndInsertTime
   * @throws IOException
   */
  private Long getAndSetStartTime(String entityId,
      String entityType, Long startTime, List<TimelineEvent> events)
      throws IOException {
    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
    Long time = startTimeWriteCache.get(entity);
    if (time != null) {
      // return the value in the cache
      return time;
    }
    if (startTime == null && events != null) {
      // calculate best guess start time based on lowest event time
      startTime = Long.MAX_VALUE;
      for (TimelineEvent e : events) {
        if (e.getTimestamp() < startTime) {
          startTime = e.getTimestamp();
        }
      }
    }
    // check the provided start time matches the db
    return checkStartTimeInDb(entity, startTime);
  }

  /**
   * Checks db for start time and returns it if it exists. If it doesn't exist,
   * writes the suggested start time (if it is not null). This is only called
   * when the start time is not found in the cache, so it adds it back into the
   * cache if it is found.
   */
  private Long checkStartTimeInDb(EntityIdentifier entity,
      Long suggestedStartTime) throws IOException {
    Long startAndInsertTime = null;
    // create lookup key for start time
    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
    synchronized (this) {
      // retrieve value for key
      byte[] v = starttimedb.get(b);
      if (v == null) {
        // start time doesn't exist in db
        if (suggestedStartTime == null) {
          return null;
        }
        startAndInsertTime = suggestedStartTime;

        // write suggested start time
        starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
      } else {
        // found start time in db, so ignore suggested start time
        startAndInsertTime = readReverseOrderedLong(v, 0);
      }
    }
    startTimeWriteCache.put(entity, startAndInsertTime);
    startTimeReadCache.put(entity, startAndInsertTime);
    return startAndInsertTime;
  }

  /**
   * Creates a key for looking up the start time of a given entity, of the form
   * START_TIME_LOOKUP_PREFIX + entity type + entity id.
   */
  private static byte[] createStartTimeLookupKey(String entityId,
      String entityType) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes();
  }

  /**
   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
   * revstarttime + entity id.
   */
  private static byte[] createEntityMarkerKey(String entityId,
      String entityType, byte[] revStartTime) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).getBytesForLookup();
  }

  /**
   * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX +
   * primaryfiltername + primaryfiltervalue + key.
   */
  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
      Object primaryFilterValue, byte[] key) throws IOException {
    return KeyBuilder.newInstance().add(primaryFilterName)
        .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes();
  }

  /**
   * Creates an event object from the given key, offset, and value. If the event
   * type is not contained in the specified set of event types, returns null.
   */
  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
      byte[] key, int offset, byte[] value) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    long ts = kp.getNextLong();
    String tstype = kp.getNextString();
    if (eventTypes == null || eventTypes.contains(tstype)) {
      TimelineEvent event = new TimelineEvent();
      event.setTimestamp(ts);
      event.setEventType(tstype);
      Object o = null;
      try {
        o = fstConf.asObject(value);
      } catch (Exception ignore) {
        try {
          // Fall back to 2.24 parser
          o = fstConf224.asObject(value);
        } catch (Exception e) {
          LOG.warn("Error while decoding " + tstype, e);
        }
      }
      if (o == null) {
        event.setEventInfo(null);
      } else if (o instanceof Map) {
        @SuppressWarnings("unchecked")
        Map<String, Object> m = (Map<String, Object>) o;
        event.setEventInfo(m);
      } else {
        throw new IOException("Couldn't deserialize event info map");
      }
      return event;
    }
    return null;
  }

  /**
   * Parses the primary filter from the given key at the given offset and adds
   * it to the given entity.
   */
  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
      int offset) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    String name = kp.getNextString();
    byte[] bytes = kp.getRemainingBytes();
    Object value = null;
    try {
      value = fstConf.asObject(bytes);
      entity.addPrimaryFilter(name, value);
    } catch (Exception ignore) {
      try {
        // Fall back to 2.24 parser
        value = fstConf224.asObject(bytes);
        entity.addPrimaryFilter(name, value);
      } catch (Exception e) {
        LOG.warn("Error while decoding " + name, e);
      }
    }
  }

  /**
   * Creates a string representation of the byte array from the given offset to
   * the end of the array (for parsing other info keys).
   */
  private static String parseRemainingKey(byte[] b, int offset) {
    return new String(b, offset, b.length - offset, UTF_8);
  }

  /**
   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type
   * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type +
   * relatedentity id.
   */
  private static byte[] createRelatedEntityKey(String entityId,
      String entityType, byte[] revStartTime, String relatedEntityId,
      String relatedEntityType) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType)
        .add(relatedEntityId).getBytes();
  }

  /**
   * Parses the related entity from the given key at the given offset and adds
   * it to the given entity.
   */
  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
      int offset) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    String type = kp.getNextString();
    String id = kp.getNextString();
    entity.addRelatedEntity(type, id);
  }

  /**
   * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type +
   * revstarttime + entity id + DOMAIN_ID_COLUMN.
   */
  private static byte[] createDomainIdKey(String entityId, String entityType,
      byte[] revStartTime) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
  }

  /**
   * Clears the cache to test reloading start times from leveldb (only for
   * testing).
   */
  @VisibleForTesting
  void clearStartTimeCache() {
    startTimeWriteCache.clear();
    startTimeReadCache.clear();
  }

  @VisibleForTesting
  static int getStartTimeReadCacheSize(Configuration conf) {
    return conf
        .getInt(
            TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
  }

  @VisibleForTesting
  static int getStartTimeWriteCacheSize(Configuration conf) {
    return conf
        .getInt(
            TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
  }

  @VisibleForTesting
  long evictOldStartTimes(long minStartTime) throws IOException {
    LOG.info("Searching for start times to evict earlier than " + minStartTime);

    long batchSize = 0;
    long totalCount = 0;
    long startTimesCount = 0;

    WriteBatch writeBatch = null;

    ReadOptions readOptions = new ReadOptions();
    readOptions.fillCache(false);
    try (DBIterator iterator = starttimedb.iterator(readOptions)) {

      // seek to the first start time entry
      iterator.seekToFirst();
      writeBatch = starttimedb.createWriteBatch();

      // evaluate each start time entry to see if it needs to be evicted or not
      while (iterator.hasNext()) {
        Map.Entry<byte[], byte[]> current = iterator.next();
        byte[] entityKey = current.getKey();
        byte[] entityValue = current.getValue();
        long startTime = readReverseOrderedLong(entityValue, 0);
        if (startTime < minStartTime) {
          ++batchSize;
          ++startTimesCount;
          writeBatch.delete(entityKey);

          // a large delete will hold the lock for too long
          if (batchSize >= writeBatchSize) {
            LOG.debug("Preparing to delete a batch of {} old start times",
                batchSize);
            starttimedb.write(writeBatch);
            LOG.debug("Deleted batch of {}. Total start times deleted"
                + " so far this cycle: {}", batchSize, startTimesCount);
            IOUtils.cleanupWithLogger(LOG, writeBatch);
            writeBatch = starttimedb.createWriteBatch();
            batchSize = 0;
          }
        }
        ++totalCount;
      }
      LOG.debug("Preparing to delete a batch of {} old start times",
          batchSize);
      starttimedb.write(writeBatch);
      LOG.debug("Deleted batch of {}. Total start times deleted so far"
          + " this cycle: {}", batchSize, startTimesCount);
      LOG.info("Deleted " + startTimesCount + "/" + totalCount
          + " start time entities earlier than " + minStartTime);
    } finally {
      IOUtils.cleanupWithLogger(LOG, writeBatch);
    }
    return startTimesCount;
  }

  /**
   * Discards entities with start timestamp less than or equal to the given
   * timestamp.
   */
  @VisibleForTesting
  void discardOldEntities(long timestamp) throws IOException,
      InterruptedException {
    long totalCount = 0;
    long t1 = System.currentTimeMillis();
    try {
      totalCount += evictOldStartTimes(timestamp);
      indexdb.evictOldDBs();
      entitydb.evictOldDBs();
    } finally {
      long t2 = System.currentTimeMillis();
      LOG.info("Discarded " + totalCount + " entities for timestamp "
          + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
    }
  }

  Version loadVersion() throws IOException {
    byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY));
    // if version is not stored previously, treat it as 1.0.
    if (data == null || data.length == 0) {
      return Version.newInstance(1, 0);
    }
    Version version = new VersionPBImpl(VersionProto.parseFrom(data));
    return version;
  }

  // Only used for test
  @VisibleForTesting
  void storeVersion(Version state) throws IOException {
    dbStoreVersion(state);
  }

  private void dbStoreVersion(Version state) throws IOException {
    String key = TIMELINE_STORE_VERSION_KEY;
    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
    try {
      starttimedb.put(bytes(key), data);
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  /**
   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25,
   * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any
   * compatible change of TS-store is a minor upgrade. 3) Within a minor
   * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal.
   * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate
   * user to use a separate upgrade tool to upgrade timeline store or remove
   * incompatible old state.
   */
  private void checkVersion() throws IOException {
    Version loadedVersion = loadVersion();
    LOG.info("Loaded timeline store version info " + loadedVersion);
    if (loadedVersion.equals(getCurrentVersion())) {
      return;
    }
    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
      LOG.info("Storing timeline store version info " + getCurrentVersion());
      dbStoreVersion(CURRENT_VERSION_INFO);
    } else {
      String incompatibleMessage = "Incompatible version for timeline store: "
          + "expecting version " + getCurrentVersion()
          + ", but loading version " + loadedVersion;
      LOG.error(incompatibleMessage);
      throw new IOException(incompatibleMessage);
    }
  }

  // TODO: make data retention work with the domain data as well
  @Override
  public void put(TimelineDomain domain) throws IOException {
    try (WriteBatch domainWriteBatch = domaindb.createWriteBatch();
         WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) {

      if (domain.getId() == null || domain.getId().length() == 0) {
        throw new IllegalArgumentException("Domain doesn't have an ID");
      }
      if (domain.getOwner() == null || domain.getOwner().length() == 0) {
        throw new IllegalArgumentException("Domain doesn't have an owner.");
      }

      // Write description
      byte[] domainEntryKey = createDomainEntryKey(domain.getId(),
          DESCRIPTION_COLUMN);
      byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), DESCRIPTION_COLUMN);
      if (domain.getDescription() != null) {
        domainWriteBatch.put(domainEntryKey,
            domain.getDescription().getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription()
            .getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write owner
      domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), OWNER_COLUMN);
      // Null check for owner is done before
      domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8));
      ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner()
          .getBytes(UTF_8));

      // Write readers
      domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), READER_COLUMN);
      if (domain.getReaders() != null && domain.getReaders().length() > 0) {
        domainWriteBatch.put(domainEntryKey, domain.getReaders()
            .getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey,
            domain.getReaders().getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write writers
      domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), WRITER_COLUMN);
      if (domain.getWriters() != null && domain.getWriters().length() > 0) {
        domainWriteBatch.put(domainEntryKey, domain.getWriters()
            .getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey,
            domain.getWriters().getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write creation time and modification time
      // We put both timestamps together because they are always retrieved
      // together, and store them in the same way as we did for the entity's
      // start time and insert time.
      domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), TIMESTAMP_COLUMN);
      long currentTimestamp = System.currentTimeMillis();
      byte[] timestamps = domaindb.get(domainEntryKey);
      if (timestamps == null) {
        timestamps = new byte[16];
        writeReverseOrderedLong(currentTimestamp, timestamps, 0);
        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
      } else {
        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
      }
      domainWriteBatch.put(domainEntryKey, timestamps);
      ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
      domaindb.write(domainWriteBatch);
      ownerdb.write(ownerWriteBatch);
    }
  }

  /**
   * Creates a domain entity key with column name suffix, of the form
   * DOMAIN_ENTRY_PREFIX + domain id + column name.
   */
  private static byte[] createDomainEntryKey(String domainId, byte[] columnName)
      throws IOException {
    return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes();
  }

  /**
   * Creates an owner lookup key with column name suffix, of the form
   * OWNER_LOOKUP_PREFIX + owner + domain id + column name.
   */
  private static byte[] createOwnerLookupKey(String owner, String domainId,
      byte[] columnName) throws IOException {
    return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName)
        .getBytes();
  }

  @Override
  public TimelineDomain getDomain(String domainId) throws IOException {
    try (DBIterator iterator = domaindb.iterator()) {
      byte[] prefix = KeyBuilder.newInstance().add(domainId)
          .getBytesForLookup();
      iterator.seek(prefix);
      return getTimelineDomain(iterator, domainId, prefix);
    }
  }

  @Override
  public TimelineDomains getDomains(String owner) throws IOException {
    try (DBIterator iterator = ownerdb.iterator()) {
      byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
      iterator.seek(prefix);
      List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
      while (iterator.hasNext()) {
        byte[] key = iterator.peekNext().getKey();
        if (!prefixMatches(prefix, prefix.length, key)) {
          break;
        }
        // Iterator to parse the rows of an individual domain
        KeyParser kp = new KeyParser(key, prefix.length);
        String domainId = kp.getNextString();
        byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId)
            .getBytesForLookup();
        TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId,
            prefixExt);
        if (domainToReturn != null) {
          domains.add(domainToReturn);
        }
      }
      // Sort the domains to return
      Collections.sort(domains, new Comparator<TimelineDomain>() {
        @Override
        public int compare(TimelineDomain domain1, TimelineDomain domain2) {
          int result = domain2.getCreatedTime().compareTo(
              domain1.getCreatedTime());
          if (result == 0) {
            return domain2.getModifiedTime().compareTo(
                domain1.getModifiedTime());
          } else {
            return result;
          }
        }
      });
      TimelineDomains domainsToReturn = new TimelineDomains();
      domainsToReturn.addDomains(domains);
      return domainsToReturn;
    }
  }

  private static TimelineDomain getTimelineDomain(DBIterator iterator,
      String domainId, byte[] prefix) throws IOException {
    // Iterate over all the rows whose key starts with prefix to retrieve the
    // domain information.
    TimelineDomain domain = new TimelineDomain();
    domain.setId(domainId);
    boolean noRows = true;
    for (; iterator.hasNext(); iterator.next()) {
      byte[] key = iterator.peekNext().getKey();
      if (!prefixMatches(prefix, prefix.length, key)) {
        break;
      }
      if (noRows) {
        noRows = false;
      }
      byte[] value = iterator.peekNext().getValue();
      if (value != null && value.length > 0) {
        if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
          domain.setDescription(new String(value, UTF_8));
        } else if (key[prefix.length] == OWNER_COLUMN[0]) {
          domain.setOwner(new String(value, UTF_8));
        } else if (key[prefix.length] == READER_COLUMN[0]) {
          domain.setReaders(new String(value, UTF_8));
        } else if (key[prefix.length] == WRITER_COLUMN[0]) {
          domain.setWriters(new String(value, UTF_8));
        } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
          domain.setCreatedTime(readReverseOrderedLong(value, 0));
          domain.setModifiedTime(readReverseOrderedLong(value, 8));
        } else {
          LOG.error("Unrecognized domain column: " + key[prefix.length]);
        }
      }
    }
    if (noRows) {
      return null;
    } else {
      return domain;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop EntityIdentifier 源码

hadoop GenericObjectMapper 源码

hadoop KeyValueBasedTimelineStore 源码

hadoop LeveldbTimelineStore 源码

hadoop MemoryTimelineStore 源码

hadoop NameValuePair 源码

hadoop RollingLevelDB 源码

hadoop TimelineDataManager 源码

hadoop TimelineDataManagerMetrics 源码

hadoop TimelineReader 源码

0  赞