kafka TransactionIndex 源码

  • 2022-10-20
  • 浏览 (230)

kafka TransactionIndex 代码

文件路径:/core/src/main/scala/kafka/log/TransactionIndex.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package kafka.log

import java.io.{File, IOException}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import kafka.utils.{Logging, nonthreadsafe}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.utils.Utils

import scala.collection.mutable.ListBuffer

private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean)

/**
 * The transaction index maintains metadata about the aborted transactions for each segment. This includes
 * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
 * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
 * the READ_COMMITTED isolation level.
 *
 * There is at most one transaction index for each log segment. The entries correspond to the transactions
 * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
 * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
 * order to find the start of the transactions.
 */
@nonthreadsafe
class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Logging {

  // note that the file is not created until we need it
  @volatile private var maybeChannel: Option[FileChannel] = None
  private var lastOffset: Option[Long] = None

  if (_file.exists)
    openChannel()

  def append(abortedTxn: AbortedTxn): Unit = {
    lastOffset.foreach { offset =>
      if (offset >= abortedTxn.lastOffset)
        throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " +
          s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
    }
    lastOffset = Some(abortedTxn.lastOffset)
    Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
  }

  def flush(): Unit = maybeChannel.foreach(_.force(true))

  def file: File = _file

  def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)

  /**
   * Delete this index.
   *
   * @throws IOException if deletion fails due to an I/O error
   * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
   *         not exist
   */
  def deleteIfExists(): Boolean = {
    close()
    Files.deleteIfExists(file.toPath)
  }

  private def channel(): FileChannel = {
    maybeChannel match {
      case Some(channel) => channel
      case None => openChannel()
    }
  }

  private def openChannel(): FileChannel = {
    val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
      StandardOpenOption.WRITE)
    maybeChannel = Some(channel)
    channel.position(channel.size)
    channel
  }

  /**
   * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
   */
  def reset(): Unit = {
    maybeChannel.foreach(_.truncate(0))
    lastOffset = None
  }

  def close(): Unit = {
    maybeChannel.foreach(_.close())
    maybeChannel = None
  }

  def renameTo(f: File): Unit = {
    try {
      if (file.exists)
        Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
    } finally _file = f
  }

  def truncateTo(offset: Long): Unit = {
    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
    var newLastOffset: Option[Long] = None
    for ((abortedTxn, position) <- iterator(() => buffer)) {
      if (abortedTxn.lastOffset >= offset) {
        channel().truncate(position)
        lastOffset = newLastOffset
        return
      }
      newLastOffset = Some(abortedTxn.lastOffset)
    }
  }

  private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = {
    maybeChannel match {
      case None => Iterator.empty
      case Some(channel) =>
        var position = 0

        new Iterator[(AbortedTxn, Int)] {
          override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize

          override def next(): (AbortedTxn, Int) = {
            try {
              val buffer = allocate()
              Utils.readFully(channel, buffer, position)
              buffer.flip()

              val abortedTxn = new AbortedTxn(buffer)
              if (abortedTxn.version > AbortedTxn.CurrentVersion)
                throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " +
                  s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}")
              val nextEntry = (abortedTxn, position)
              position += AbortedTxn.TotalSize
              nextEntry
            } catch {
              case e: IOException =>
                // We received an unexpected error reading from the index file. We propagate this as an
                // UNKNOWN error to the consumer, which will cause it to retry the fetch.
                throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e)
            }
          }
        }
    }
  }

  def allAbortedTxns: List[AbortedTxn] = {
    iterator().map(_._1).toList
  }

  /**
   * Collect all aborted transactions which overlap with a given fetch range.
   *
   * @param fetchOffset Inclusive first offset of the fetch range
   * @param upperBoundOffset Exclusive last offset in the fetch range
   * @return An object containing the aborted transactions and whether the search needs to continue
   *         into the next log segment.
   */
  def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
    val abortedTransactions = ListBuffer.empty[AbortedTxn]
    for ((abortedTxn, _) <- iterator()) {
      if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
        abortedTransactions += abortedTxn

      if (abortedTxn.lastStableOffset >= upperBoundOffset)
        return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
    }
    TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
  }

  /**
   * Do a basic sanity check on this index to detect obvious problems.
   *
   * @throws CorruptIndexException if any problems are found.
   */
  def sanityCheck(): Unit = {
    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
    for ((abortedTxn, _) <- iterator(() => buffer)) {
      if (abortedTxn.lastOffset < startOffset)
        throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " +
          s"${file.getAbsolutePath} is less than start offset $startOffset")
    }
  }

}

private[log] object AbortedTxn {
  val VersionOffset = 0
  val VersionSize = 2
  val ProducerIdOffset = VersionOffset + VersionSize
  val ProducerIdSize = 8
  val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize
  val FirstOffsetSize = 8
  val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize
  val LastOffsetSize = 8
  val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize
  val LastStableOffsetSize = 8
  val TotalSize = LastStableOffsetOffset + LastStableOffsetSize

  val CurrentVersion: Short = 0
}

private[log] class AbortedTxn(val buffer: ByteBuffer) {
  import AbortedTxn._

  def this(producerId: Long,
           firstOffset: Long,
           lastOffset: Long,
           lastStableOffset: Long) = {
    this(ByteBuffer.allocate(AbortedTxn.TotalSize))
    buffer.putShort(CurrentVersion)
    buffer.putLong(producerId)
    buffer.putLong(firstOffset)
    buffer.putLong(lastOffset)
    buffer.putLong(lastStableOffset)
    buffer.flip()
  }

  def this(completedTxn: CompletedTxn, lastStableOffset: Long) =
    this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset)

  def version: Short = buffer.get(VersionOffset)

  def producerId: Long = buffer.getLong(ProducerIdOffset)

  def firstOffset: Long = buffer.getLong(FirstOffsetOffset)

  def lastOffset: Long = buffer.getLong(LastOffsetOffset)

  def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset)

  def asAbortedTransaction: FetchResponseData.AbortedTransaction = new FetchResponseData.AbortedTransaction()
    .setProducerId(producerId)
    .setFirstOffset(firstOffset)

  override def toString: String =
    s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " +
      s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)"

  override def equals(any: Any): Boolean = {
    any match {
      case that: AbortedTxn => this.buffer.equals(that.buffer)
      case _ => false
    }
  }

  override def hashCode(): Int = buffer.hashCode
}

相关信息

kafka 源码目录

相关文章

kafka AbstractIndex 源码

kafka CleanerConfig 源码

kafka CorruptIndexException 源码

kafka IndexEntry 源码

kafka LazyIndex 源码

kafka LocalLog 源码

kafka LogCleaner 源码

kafka LogCleanerManager 源码

kafka LogConfig 源码

kafka LogLoader 源码

0  赞