hadoop JournaledEditsCache 源码

  • 2022-10-20
  • 浏览 (249)

haddop JournaledEditsCache 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.util.AutoCloseableLock;

/**
 * An in-memory cache of edits in their serialized form. This is used to serve
 * the {@link Journal#getJournaledEdits(long, int)} call, used by the
 * QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
 * enabled.
 *
 * <p>When a batch of edits is received by the JournalNode, it is put into this
 * cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
 * stored contiguously; if a batch of edits is stored that does not align with
 * the previously stored edits, the cache will be cleared before storing new
 * edits to avoid gaps. This decision is made because gaps are only handled
 * when in recovery mode, which the cache is not intended to be used for.
 *
 * <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
 * transaction ID of the batch to the data buffer. Upon retrieval, the
 * relevant data buffers are concatenated together and a header is added
 * to construct a fully-formed edit data stream.
 *
 * <p>The cache is of a limited size capacity determined by
 * {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
 * is exceeded after adding a new batch of edits, batches of edits are removed
 * until the total size is less than the capacity, starting from the ones
 * containing the oldest transactions. Transactions range in size, but a
 * decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
 * the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
 * to determine if the cache is too small; it will indicate both how many
 * cache misses occurred, and how many more transactions would have been
 * needed in the cache to serve the request.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
class JournaledEditsCache {

  private static final int INVALID_LAYOUT_VERSION = 0;
  private static final long INVALID_TXN_ID = -1;

  /** The capacity, in bytes, of this cache. */
  private final int capacity;

  /**
   * Read/write lock pair wrapped in AutoCloseable; these refer to the same
   * underlying lock.
   */
  private final AutoCloseableLock readLock;
  private final AutoCloseableLock writeLock;

  // ** Start lock-protected fields **

  /**
   * Stores the actual data as a mapping of the StartTxnId of a batch of edits
   * to the serialized batch of edits. Stores only contiguous ranges; that is,
   * the last transaction ID in one batch is always one less than the first
   * transaction ID in the next batch. Though the map is protected by the lock,
   * individual data buffers are immutable and can be accessed without locking.
   */
  private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
  /** Stores the layout version currently present in the cache. */
  private int layoutVersion = INVALID_LAYOUT_VERSION;
  /** Stores the serialized version of the header for the current version. */
  private ByteBuffer layoutHeader;

  /**
   * The lowest/highest transaction IDs present in the cache.
   * {@value INVALID_TXN_ID} if there are no transactions in the cache.
   */
  private long lowestTxnId;
  private long highestTxnId;
  /**
   * The lowest transaction ID that was ever present in the cache since last
   * being reset (i.e. since initialization or since reset due to being out of
   * sync with the Journal). Until the cache size goes above capacity, this is
   * equal to lowestTxnId.
   */
  private long initialTxnId;
  /** The current total size of all buffers in this cache. */
  private int totalSize;

  // ** End lock-protected fields **

  JournaledEditsCache(Configuration conf) {
    capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
        DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
    if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
      Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
          "maximum JVM memory is only %d bytes. It is recommended that you " +
          "decrease the cache size or increase the heap size.",
          capacity, Runtime.getRuntime().maxMemory()));
    }
    Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
        "of bytes: " + capacity);
    ReadWriteLock lock = new ReentrantReadWriteLock(true);
    readLock = new AutoCloseableLock(lock.readLock());
    writeLock = new AutoCloseableLock(lock.writeLock());
    initialize(INVALID_TXN_ID);
  }

  /**
   * Fetch the data for edits starting at the specific transaction ID, fetching
   * up to {@code maxTxns} transactions. Populates a list of output buffers
   * which contains a serialized version of the edits, and returns the count of
   * edits contained within the serialized buffers. The serialized edits are
   * prefixed with a standard edit log header containing information about the
   * layout version. The transactions returned are guaranteed to have contiguous
   * transaction IDs.
   *
   * If {@code requestedStartTxn} is higher than the highest transaction which
   * has been added to this cache, a response with an empty buffer and a
   * transaction count of 0 will be returned. If {@code requestedStartTxn} is
   * lower than the lowest transaction currently contained in this cache, or no
   * transactions have yet been added to the cache, an exception will be thrown.
   *
   * @param requestedStartTxn The ID of the first transaction to return. If any
   *                          transactions are returned, it is guaranteed that
   *                          the first one will have this ID.
   * @param maxTxns The maximum number of transactions to return.
   * @param outputBuffers A list to populate with output buffers. When
   *                      concatenated, these form a full response.
   * @return The number of transactions contained within the set of output
   *         buffers.
   * @throws IOException If transactions are requested which cannot be served
   *                     by this cache.
   */
  int retrieveEdits(long requestedStartTxn, int maxTxns,
      List<ByteBuffer> outputBuffers) throws IOException {
    int txnCount = 0;

    try (AutoCloseableLock l = readLock.acquire()) {
      if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
        throw getCacheMissException(requestedStartTxn);
      } else if (requestedStartTxn > highestTxnId) {
        return 0;
      }
      outputBuffers.add(layoutHeader);
      Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
          dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
              .entrySet().iterator();
      long prevTxn = requestedStartTxn;
      byte[] prevBuf = null;
      // Stop when maximum transactions reached...
      while ((txnCount < maxTxns) &&
          // ... or there are no more entries ...
          (incrBuffIter.hasNext() || prevBuf != null)) {
        long currTxn;
        byte[] currBuf;
        if (incrBuffIter.hasNext()) {
          Map.Entry<Long, byte[]> ent = incrBuffIter.next();
          currTxn = ent.getKey();
          currBuf = ent.getValue();
        } else {
          // This accounts for the trailing entry
          currTxn = highestTxnId + 1;
          currBuf = null;
        }
        if (prevBuf != null) { // True except for the first loop iteration
          outputBuffers.add(ByteBuffer.wrap(prevBuf));
          // if prevTxn < requestedStartTxn, the extra transactions will get
          // removed after the loop, so don't include them in the txn count
          txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
        }
        prevTxn = currTxn;
        prevBuf = currBuf;
      }
      // Release the lock before doing operations on the buffers (deserializing
      // to find transaction boundaries, and copying into an output buffer)
    }
    // Remove extra leading transactions in the first buffer
    ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
    firstBuf.position(
        findTransactionPosition(firstBuf.array(), requestedStartTxn));
    // Remove trailing transactions in the last buffer if necessary
    if (txnCount > maxTxns) {
      ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
      int limit =
          findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
      lastBuf.limit(limit);
      txnCount = maxTxns;
    }

    return txnCount;
  }

  /**
   * Store a batch of serialized edits into this cache. Removes old batches
   * as necessary to keep the total size of the cache below the capacity.
   * See the class Javadoc for more info.
   *
   * This attempts to always handle malformed inputs gracefully rather than
   * throwing an exception, to allow the rest of the Journal's operations
   * to proceed normally.
   *
   * @param inputData A buffer containing edits in serialized form
   * @param newStartTxn The txn ID of the first edit in {@code inputData}
   * @param newEndTxn The txn ID of the last edit in {@code inputData}
   * @param newLayoutVersion The version of the layout used to serialize
   *                         the edits
   */
  void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
      int newLayoutVersion) {
    if (newStartTxn < 0 || newEndTxn < newStartTxn) {
      Journal.LOG.error(String.format("Attempted to cache data of length %d " +
          "with newStartTxn %d and newEndTxn %d",
          inputData.length, newStartTxn, newEndTxn));
      return;
    }
    try (AutoCloseableLock l = writeLock.acquire()) {
      if (newLayoutVersion != layoutVersion) {
        try {
          updateLayoutVersion(newLayoutVersion, newStartTxn);
        } catch (IOException ioe) {
          Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
              "due to exception when updating to new layout version %d",
              newStartTxn, newEndTxn, newLayoutVersion), ioe);
          return;
        }
      } else if (lowestTxnId == INVALID_TXN_ID) {
        Journal.LOG.info("Initializing edits cache starting from txn ID " +
            newStartTxn);
        initialize(newStartTxn);
      } else if (highestTxnId + 1 != newStartTxn) {
        // Cache is out of sync; clear to avoid storing noncontiguous regions
        Journal.LOG.error(String.format("Edits cache is out of sync; " +
            "looked for next txn id at %d but got start txn id for " +
            "cache put request at %d. Reinitializing at new request.",
            highestTxnId + 1, newStartTxn));
        initialize(newStartTxn);
      }

      while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
        Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
        dataMap.remove(lowest.getKey());
        totalSize -= lowest.getValue().length;
      }
      if (inputData.length > capacity) {
        initialize(INVALID_TXN_ID);
        Journal.LOG.warn(String.format("A single batch of edits was too " +
                "large to fit into the cache: startTxn = %d, endTxn = %d, " +
                "input length = %d. The capacity of the cache (%s) must be " +
                "increased for it to work properly (current capacity %d)." +
                "Cache is now empty.",
            newStartTxn, newEndTxn, inputData.length,
            DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
        return;
      }
      if (dataMap.isEmpty()) {
        lowestTxnId = newStartTxn;
      } else {
        lowestTxnId = dataMap.firstKey();
      }

      dataMap.put(newStartTxn, inputData);
      highestTxnId = newEndTxn;
      totalSize += inputData.length;
    }
  }

  /**
   * Skip through a given stream of edits until the given transaction ID is
   * found. Return the number of bytes that appear prior to the given
   * transaction.
   *
   * @param buf A buffer containing a stream of serialized edits
   * @param txnId The transaction ID to search for
   * @return The number of bytes appearing in {@code buf} <i>before</i>
   *         the start of the transaction with ID {@code txnId}.
   */
  private int findTransactionPosition(byte[] buf, long txnId)
      throws IOException {
    ByteArrayInputStream bais = new ByteArrayInputStream(buf);
    FSEditLogLoader.PositionTrackingInputStream tracker =
        new FSEditLogLoader.PositionTrackingInputStream(bais);
    FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
        new DataInputStream(tracker), tracker, layoutVersion);
    long previousPos = 0;
    while (reader.scanOp() < txnId) {
      previousPos = tracker.getPos();
    }
    // tracker is backed by a byte[]; position cannot go above an integer
    return (int) previousPos;
  }

  /**
   * Update the layout version of the cache. This clears out all existing
   * entries, and populates the new layout version and header for that version.
   *
   * @param newLayoutVersion The new layout version to be stored in the cache
   * @param newStartTxn The new lowest transaction in the cache
   */
  private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
      throws IOException {
    StringBuilder logMsg = new StringBuilder()
        .append("Updating edits cache to use layout version ")
        .append(newLayoutVersion)
        .append(" starting from txn ID ")
        .append(newStartTxn);
    if (layoutVersion != INVALID_LAYOUT_VERSION) {
      logMsg.append("; previous version was ").append(layoutVersion)
          .append("; old entries will be cleared.");
    }
    Journal.LOG.info(logMsg.toString());
    initialize(newStartTxn);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    EditLogFileOutputStream.writeHeader(newLayoutVersion,
        new DataOutputStream(baos));
    layoutVersion = newLayoutVersion;
    layoutHeader = ByteBuffer.wrap(baos.toByteArray());
  }

  /**
   * Initialize the cache back to a clear state.
   *
   * @param newInitialTxnId The new lowest transaction ID stored in the cache.
   *                        This should be {@value INVALID_TXN_ID} if the cache
   *                        is to remain empty at this time.
   */
  private void initialize(long newInitialTxnId) {
    dataMap.clear();
    totalSize = 0;
    initialTxnId = newInitialTxnId;
    lowestTxnId = initialTxnId;
    highestTxnId = INVALID_TXN_ID; // this will be set later
  }

  /**
   * Return the underlying data buffer used to store information about the
   * given transaction ID.
   *
   * @param txnId Transaction ID whose containing buffer should be fetched.
   * @return The data buffer for the transaction
   */
  @VisibleForTesting
  byte[] getRawDataForTests(long txnId) {
    try (AutoCloseableLock l = readLock.acquire()) {
      return dataMap.floorEntry(txnId).getValue();
    }
  }

  private CacheMissException getCacheMissException(long requestedTxnId) {
    if (lowestTxnId == INVALID_TXN_ID) {
      return new CacheMissException(0, "Cache is empty; either it was never " +
          "written to or the last write overflowed the cache capacity.");
    } else if (requestedTxnId < initialTxnId) {
      return new CacheMissException(initialTxnId - requestedTxnId,
          "Cache started at txn ID %d but requested txns starting at %d.",
          initialTxnId, requestedTxnId);
    } else {
      return new CacheMissException(lowestTxnId - requestedTxnId,
          "Oldest txn ID available in the cache is %d, but requested txns " +
              "starting at %d. The cache size (%s) may need to be increased " +
              "to hold more transactions (currently %d bytes containing %d " +
              "transactions)", lowestTxnId, requestedTxnId,
          DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
          highestTxnId - lowestTxnId + 1);
    }
  }

  static class CacheMissException extends IOException {

    private static final long serialVersionUID = 0L;

    private final long cacheMissAmount;

    CacheMissException(long cacheMissAmount, String msgFormat,
        Object... msgArgs) {
      super(String.format(msgFormat, msgArgs));
      this.cacheMissAmount = cacheMissAmount;
    }

    long getCacheMissAmount() {
      return cacheMissAmount;
    }

  }

}

相关信息

hadoop 源码目录

相关文章

hadoop GetJournalEditServlet 源码

hadoop JNStorage 源码

hadoop Journal 源码

hadoop JournalFaultInjector 源码

hadoop JournalMetrics 源码

hadoop JournalNode 源码

hadoop JournalNodeHttpServer 源码

hadoop JournalNodeMXBean 源码

hadoop JournalNodeRpcServer 源码

hadoop JournalNodeSyncer 源码

0  赞