hadoop LevelDBCacheTimelineStore 源码

  • 2022-10-20
  • 浏览 (226)

haddop LevelDBCacheTimelineStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timeline;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * LevelDB implementation of {@link KeyValueBasedTimelineStore}. This
 * implementation stores the entity hash map into a LevelDB instance.
 * There are two partitions of the key space. One partition is to store a
 * entity id to start time mapping:
 *
 * i!ENTITY_ID!ENTITY_TYPE to ENTITY_START_TIME
 *
 * The other partition is to store the actual data:
 *
 * e!START_TIME!ENTITY_ID!ENTITY_TYPE to ENTITY_BYTES
 *
 * This storage does not have any garbage collection mechanism, and is designed
 * mainly for caching usages.
 */
@Private
@Unstable
public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
  private static final Logger LOG
      = LoggerFactory.getLogger(LevelDBCacheTimelineStore.class);
  private static final String CACHED_LDB_FILE_PREFIX = "-timeline-cache.ldb";
  private String dbId;
  private DB entityDb;
  private Configuration configuration;
  private static final AtomicInteger DB_COUNTER = new AtomicInteger(0);
  private static final String CACHED_LDB_FILENAME = "db";

  public LevelDBCacheTimelineStore(String id, String name) {
    super(name);
    dbId = id;
    entityInsertTimes = new MemoryTimelineStore.HashMapStoreAdapter<>();
    domainById = new MemoryTimelineStore.HashMapStoreAdapter<>();
    domainsByOwner = new MemoryTimelineStore.HashMapStoreAdapter<>();
  }

  public LevelDBCacheTimelineStore(String id) {
    this(id, LevelDBCacheTimelineStore.class.getName());
  }

  public LevelDBCacheTimelineStore() {
    this(CACHED_LDB_FILENAME + String.valueOf(DB_COUNTER.getAndIncrement()),
        LevelDBCacheTimelineStore.class.getName());
  }

  @Override
  protected synchronized void serviceInit(Configuration conf) throws Exception {
    configuration = conf;
    Options options = new Options();
    options.createIfMissing(true);
    options.cacheSize(conf.getLong(
        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE,
        YarnConfiguration.
            DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE));
    JniDBFactory factory = new JniDBFactory();
    Path dbPath = new Path(
        conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
        dbId + CACHED_LDB_FILE_PREFIX);
    FileSystem localFS = null;

    try {
      localFS = FileSystem.getLocal(conf);
      if (!localFS.exists(dbPath)) {
        if (!localFS.mkdirs(dbPath)) {
          throw new IOException("Couldn't create directory for leveldb " +
              "timeline store " + dbPath);
        }
        localFS.setPermission(dbPath, LeveldbUtils.LEVELDB_DIR_UMASK);
      }
    } finally {
      IOUtils.cleanupWithLogger(LOG, localFS);
    }
    LOG.info("Using leveldb path " + dbPath);
    entityDb = factory.open(new File(dbPath.toString()), options);
    entities = new LevelDBMapAdapter<>(entityDb);

    super.serviceInit(conf);
  }

  @Override
  protected synchronized void serviceStop() throws Exception {
    IOUtils.cleanupWithLogger(LOG, entityDb);
    Path dbPath = new Path(
        configuration.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
        dbId + CACHED_LDB_FILE_PREFIX);
    FileSystem localFS = null;
    try {
      localFS = FileSystem.getLocal(configuration);
      if (!localFS.delete(dbPath, true)) {
          throw new IOException("Couldn't delete data file for leveldb " +
              "timeline store " + dbPath);
      }
    } finally {
      IOUtils.cleanupWithLogger(LOG, localFS);
    }
    super.serviceStop();
  }

  /**
   * A specialized hash map storage that uses LevelDB for storing entity id to
   * entity mappings.
   *
   * @param <K> an {@link EntityIdentifier} typed hash key
   * @param <V> a {@link TimelineEntity} typed value
   */
  static class LevelDBMapAdapter<K extends EntityIdentifier,
      V extends TimelineEntity> implements TimelineStoreMapAdapter<K, V> {
    private static final String TIME_INDEX_PREFIX = "i";
    private static final String ENTITY_STORAGE_PREFIX = "e";
    DB entityDb;

    public LevelDBMapAdapter(DB currLevelDb) {
      entityDb = currLevelDb;
    }

    @Override
    public V get(K entityId) {
      V result = null;
      // Read the start time from the index
      byte[] startTimeBytes = entityDb.get(getStartTimeKey(entityId));
      if (startTimeBytes == null) {
        return null;
      }

      // Build the key for the entity storage and read it
      try {
        result = getEntityForKey(getEntityKey(entityId, startTimeBytes));
      } catch (IOException e) {
        LOG.error("GenericObjectMapper cannot read key from key "
            + entityId.toString()
            + " into an object. Read aborted! ");
        LOG.error(e.getMessage());
      }

      return result;
    }

    @Override
    public void put(K entityId, V entity) {
      Long startTime = entity.getStartTime();
      if (startTime == null) {
        startTime = System.currentTimeMillis();
      }
      // Build the key for the entity storage and read it
      byte[] startTimeBytes = GenericObjectMapper.writeReverseOrderedLong(
          startTime);
      try {
        byte[] valueBytes = GenericObjectMapper.write(entity);
        entityDb.put(getEntityKey(entityId, startTimeBytes), valueBytes);
      } catch (IOException e) {
        LOG.error("GenericObjectMapper cannot write "
            + entity.getClass().getName()
            + " into a byte array. Write aborted! ");
        LOG.error(e.getMessage());
      }

      // Build the key for the start time index
      entityDb.put(getStartTimeKey(entityId), startTimeBytes);
    }

    @Override
    public void remove(K entityId) {
      // Read the start time from the index (key starts with an "i") then delete
      // the record
      LeveldbUtils.KeyBuilder startTimeKeyBuilder
          = LeveldbUtils.KeyBuilder.newInstance();
      startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
          .add(entityId.getType());
      byte[] startTimeBytes = entityDb.get(startTimeKeyBuilder.getBytes());
      if (startTimeBytes == null) {
        return;
      }
      entityDb.delete(startTimeKeyBuilder.getBytes());

      // Build the key for the entity storage and delete it
      entityDb.delete(getEntityKey(entityId, startTimeBytes));
    }

    @Override
    public CloseableIterator<V> valueSetIterator() {
      return getIterator(null, Long.MAX_VALUE);
    }

    @Override
    public CloseableIterator<V> valueSetIterator(V minV) {
      return getIterator(
          new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
          minV.getStartTime());
    }

    private CloseableIterator<V> getIterator(
        EntityIdentifier startId, long startTimeMax) {

      final DBIterator internalDbIterator = entityDb.iterator();

      // we need to iterate from the first element with key greater than or
      // equal to ENTITY_STORAGE_PREFIX!maxTS(!startId), but stop on the first
      // key who does not have prefix ENTITY_STORATE_PREFIX

      // decide end prefix
      LeveldbUtils.KeyBuilder entityPrefixKeyBuilder
          = LeveldbUtils.KeyBuilder.newInstance();
      entityPrefixKeyBuilder.add(ENTITY_STORAGE_PREFIX);
      final byte[] prefixBytes = entityPrefixKeyBuilder.getBytesForLookup();
      // decide start prefix on top of end prefix and seek
      final byte[] startTimeBytes
          = GenericObjectMapper.writeReverseOrderedLong(startTimeMax);
      entityPrefixKeyBuilder.add(startTimeBytes, true);
      if (startId != null) {
        entityPrefixKeyBuilder.add(startId.getId());
      }
      final byte[] startPrefixBytes
          = entityPrefixKeyBuilder.getBytesForLookup();
      internalDbIterator.seek(startPrefixBytes);

      return new CloseableIterator<V>() {
        @Override
        public boolean hasNext() {
          if (!internalDbIterator.hasNext()) {
            return false;
          }
          Map.Entry<byte[], byte[]> nextEntry = internalDbIterator.peekNext();
          if (LeveldbUtils.prefixMatches(
              prefixBytes, prefixBytes.length, nextEntry.getKey())) {
            return true;
          }
          return false;
        }

        @Override
        public V next() {
          if (hasNext()) {
            Map.Entry<byte[], byte[]> nextRaw = internalDbIterator.next();
            try {
              V result = getEntityForKey(nextRaw.getKey());
              return result;
            } catch (IOException e) {
              LOG.error("GenericObjectMapper cannot read key from key "
                  + nextRaw.getKey()
                  + " into an object. Read aborted! ");
              LOG.error(e.getMessage());
            }
          }
          return null;
        }

        // We do not support remove operations within one iteration
        @Override
        public void remove() {
          LOG.error("LevelDB map adapter does not support iterate-and-remove"
              + " use cases. ");
        }

        @Override
        public void close() throws IOException {
          internalDbIterator.close();
        }
      };
    }
    static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @SuppressWarnings("unchecked")
    private V getEntityForKey(byte[] key) throws IOException {
      byte[] resultRaw = entityDb.get(key);
      if (resultRaw == null) {
        return null;
      }
      return (V) OBJECT_MAPPER.readValue(resultRaw, TimelineEntity.class);
    }

    private byte[] getStartTimeKey(K entityId) {
      LeveldbUtils.KeyBuilder startTimeKeyBuilder
          = LeveldbUtils.KeyBuilder.newInstance();
      startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
          .add(entityId.getType());
      return startTimeKeyBuilder.getBytes();
    }

    private byte[] getEntityKey(K entityId, byte[] startTimeBytes) {
      LeveldbUtils.KeyBuilder entityKeyBuilder
          = LeveldbUtils.KeyBuilder.newInstance();
      entityKeyBuilder.add(ENTITY_STORAGE_PREFIX).add(startTimeBytes, true)
          .add(entityId.getId()).add(entityId.getType());
      return entityKeyBuilder.getBytes();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop EntityCacheItem 源码

hadoop EntityGroupFSTimelineStore 源码

hadoop EntityGroupFSTimelineStoreMetrics 源码

hadoop LogInfo 源码

hadoop TimelineEntityGroupPlugin 源码

hadoop package-info 源码

0  赞