kafka LocalLog 源码

  • 2022-10-20
  • 浏览 (241)

kafka LocalLog 代码

文件路径:/core/src/main/scala/kafka/log/LocalLog.scala

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.log

import java.io.{File, IOException}
import java.nio.file.Files
import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}

import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/**
 * Holds the result of splitting a segment into one or more segments, see LocalLog.splitOverflowedSegment().
 *
 * @param deletedSegments segments deleted when splitting a segment
 * @param newSegments new segments created when splitting a segment
 */
case class SplitSegmentResult(deletedSegments: Iterable[LogSegment], newSegments: Iterable[LogSegment])

/**
 * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
 * New log segments are created according to a configurable policy that controls the size in bytes or time interval
 * for a given segment.
 *
 * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class.
 *
 * @param _dir The directory in which log segments are created.
 * @param config The log configuration settings
 * @param segments The non-empty log segments recovered from disk
 * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
 * @param nextOffsetMetadata The offset where the next message could be appended
 * @param scheduler The thread pool scheduler used for background actions
 * @param time The time instance used for checking the clock
 * @param topicPartition The topic partition associated with this log
 * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure
 */
class LocalLog(@volatile private var _dir: File,
               @volatile private[log] var config: LogConfig,
               private[log] val segments: LogSegments,
               @volatile private[log] var recoveryPoint: Long,
               @volatile private var nextOffsetMetadata: LogOffsetMetadata,
               private[log] val scheduler: Scheduler,
               private[log] val time: Time,
               private[log] val topicPartition: TopicPartition,
               private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {

  import kafka.log.LocalLog._

  this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] "

  // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
  // After memory mapped buffer is closed, no disk IO operation should be performed for this log.
  @volatile private[log] var isMemoryMappedBufferClosed = false

  // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
  @volatile private var _parentDir: String = dir.getParent

  // Last time the log was flushed
  private val lastFlushedTime = new AtomicLong(time.milliseconds)

  private[log] def dir: File = _dir

  private[log] def name: String = dir.getName

  private[log] def parentDir: String = _parentDir

  private[log] def parentDirFile: File = new File(_parentDir)

  private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix)

  private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
    LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, msg) {
      fun
    }
  }

  /**
   * Rename the directory of the log
   * @param name the new dir name
   * @throws KafkaStorageException if rename fails
   */
  private[log] def renameDir(name: String): Boolean = {
    maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
      val renamedDir = new File(dir.getParent, name)
      Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
      if (renamedDir != dir) {
        _dir = renamedDir
        _parentDir = renamedDir.getParent
        segments.updateParentDir(renamedDir)
        true
      } else {
        false
      }
    }
  }

  /**
   * Update the existing configuration to the new provided configuration.
   * @param newConfig the new configuration to be updated to
   */
  private[log] def updateConfig(newConfig: LogConfig): Unit = {
    val oldConfig = config
    config = newConfig
    val oldRecordVersion = oldConfig.recordVersion
    val newRecordVersion = newConfig.recordVersion
    if (newRecordVersion.precedes(oldRecordVersion))
      warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
  }

  private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
    if (isMemoryMappedBufferClosed)
      throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
  }

  private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
    recoveryPoint = newRecoveryPoint
  }

  /**
   * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater
   * than the existing recoveryPoint.
   *
   * @param offset the offset to be updated
   */
  private[log] def markFlushed(offset: Long): Unit = {
    checkIfMemoryMappedBufferClosed()
    if (offset > recoveryPoint) {
      updateRecoveryPoint(offset)
      lastFlushedTime.set(time.milliseconds)
    }
  }

  /**
   * The number of messages appended to the log since the last flush
   */
  private[log] def unflushedMessages: Long = logEndOffset - recoveryPoint

  /**
   * Flush local log segments for all offsets up to offset-1.
   * Does not update the recovery point.
   *
   * @param offset The offset to flush up to (non-inclusive)
   */
  private[log] def flush(offset: Long): Unit = {
    val currentRecoveryPoint = recoveryPoint
    if (currentRecoveryPoint <= offset) {
      val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
      segmentsToFlush.foreach(_.flush())
      // If there are any new segments, we need to flush the parent directory for crash consistency.
      if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint))
        Utils.flushDir(dir.toPath)
    }
  }

  /**
   * The time this log is last known to have been fully flushed to disk
   */
  private[log] def lastFlushTime: Long = lastFlushedTime.get

  /**
   * The offset metadata of the next message that will be appended to the log
   */
  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata

  /**
   * The offset of the next message that will be appended to the log
   */
  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset

  /**
   * Update end offset of the log, and update the recoveryPoint.
   *
   * @param endOffset the new end offset of the log
   */
  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
    nextOffsetMetadata = LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
    if (recoveryPoint > endOffset) {
      updateRecoveryPoint(endOffset)
    }
  }

  /**
   * Close file handlers used by log but don't write to disk.
   * This is called if the log directory is offline.
   */
  private[log] def closeHandlers(): Unit = {
    segments.closeHandlers()
    isMemoryMappedBufferClosed = true
  }

  /**
   * Closes the segments of the log.
   */
  private[log] def close(): Unit = {
    maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
      checkIfMemoryMappedBufferClosed()
      segments.close()
    }
  }

  /**
   * Completely delete this log directory with no delay.
   */
  private[log] def deleteEmptyDir(): Unit = {
    maybeHandleIOException(s"Error while deleting dir for $topicPartition in dir ${dir.getParent}") {
      if (segments.nonEmpty) {
        throw new IllegalStateException(s"Can not delete directory when ${segments.numberOfSegments} segments are still present")
      }
      if (!isMemoryMappedBufferClosed) {
        throw new IllegalStateException(s"Can not delete directory when memory mapped buffer for log of $topicPartition is still open.")
      }
      Utils.delete(dir)
    }
  }

  /**
   * Completely delete all segments with no delay.
   * @return the deleted segments
   */
  private[log] def deleteAllSegments(): Iterable[LogSegment] = {
    maybeHandleIOException(s"Error while deleting all segments for $topicPartition in dir ${dir.getParent}") {
      val deletableSegments = List[LogSegment]() ++ segments.values
      removeAndDeleteSegments(segments.values, asyncDelete = false, LogDeletion(this))
      isMemoryMappedBufferClosed = true
      deletableSegments
    }
  }

  /**
   * Find segments starting from the oldest until the user-supplied predicate is false.
   * A final segment that is empty will never be returned.
   *
   * @param predicate A function that takes in a candidate log segment, the next higher segment
   *                  (if there is one). It returns true iff the segment is deletable.
   * @return the segments ready to be deleted
   */
  private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      val segmentsIterator = segments.values.iterator
      var segmentOpt = nextOption(segmentsIterator)
      while (segmentOpt.isDefined) {
        val segment = segmentOpt.get
        val nextSegmentOpt = nextOption(segmentsIterator)
        val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
        if (predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentOpt = nextSegmentOpt
        } else {
          segmentOpt = Option.empty
        }
      }
      deletable
    }
  }

  /**
   * This method deletes the given log segments by doing the following for each of them:
   * - It removes the segment from the segment map so that it will no longer be used for reads.
   * - It renames the index and log files by appending .deleted to the respective file name
   * - It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
   *
   * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
   * physically deleting a file while it is being read.
   *
   * This method does not convert IOException to KafkaStorageException, the immediate caller
   * is expected to catch and handle IOException.
   *
   * @param segmentsToDelete The log segments to schedule for deletion
   * @param asyncDelete Whether the segment files should be deleted asynchronously
   * @param reason The reason for the segment deletion
   */
  private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
                                           asyncDelete: Boolean,
                                           reason: SegmentDeletionReason): Unit = {
    if (segmentsToDelete.nonEmpty) {
      // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
      // removing the deleted segment, we should force materialization of the iterator here, so that results of the
      // iteration remain valid and deterministic. We should also pass only the materialized view of the
      // iterator to the logic that actually deletes the segments.
      val toDelete = segmentsToDelete.toList
      reason.logReason(toDelete)
      toDelete.foreach { segment =>
        segments.remove(segment.baseOffset)
      }
      LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
    }
  }

  /**
   * This method deletes the given segment and creates a new segment with the given new base offset. It ensures an
   * active segment exists in the log at all times during this process.
   *
   * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
   * physically deleting a file while it is being read.
   *
   * This method does not convert IOException to KafkaStorageException, the immediate caller
   * is expected to catch and handle IOException.
   *
   * @param newOffset The base offset of the new segment
   * @param segmentToDelete The old active segment to schedule for deletion
   * @param asyncDelete Whether the segment files should be deleted asynchronously
   * @param reason The reason for the segment deletion
   */
  private[log] def createAndDeleteSegment(newOffset: Long,
                                          segmentToDelete: LogSegment,
                                          asyncDelete: Boolean,
                                          reason: SegmentDeletionReason): LogSegment = {
    if (newOffset == segmentToDelete.baseOffset)
      segmentToDelete.changeFileSuffixes("", DeletedFileSuffix)

    val newSegment = LogSegment.open(dir,
      baseOffset = newOffset,
      config,
      time = time,
      initFileSize = config.initFileSize,
      preallocate = config.preallocate)
    segments.add(newSegment)

    reason.logReason(List(segmentToDelete))
    if (newOffset != segmentToDelete.baseOffset)
      segments.remove(segmentToDelete.baseOffset)
    LocalLog.deleteSegmentFiles(List(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)

    newSegment
  }

  /**
   * Given a message offset, find its corresponding offset metadata in the log.
   * If the message offset is out of range, throw an OffsetOutOfRangeException
   */
  private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    val fetchDataInfo = read(offset,
      maxLength = 1,
      minOneMessage = false,
      maxOffsetMetadata = nextOffsetMetadata,
      includeAbortedTxns = false)
    fetchDataInfo.fetchOffsetMetadata
  }

  /**
   * Read messages from the log.
   *
   * @param startOffset The offset to begin reading at
   * @param maxLength The maximum number of bytes to read
   * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
   * @param maxOffsetMetadata The metadata of the maximum offset to be fetched
   * @param includeAbortedTxns If true, aborted transactions are included
   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset
   * @return The fetch data information including fetch starting offset metadata and messages read.
   */
  def read(startOffset: Long,
           maxLength: Int,
           minOneMessage: Boolean,
           maxOffsetMetadata: LogOffsetMetadata,
           includeAbortedTxns: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
        s"total length ${segments.sizeInBytes} bytes")

      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      var segmentOpt = segments.floorSegment(startOffset)

      // return error on attempt to read beyond the log end offset
      if (startOffset > endOffset || segmentOpt.isEmpty)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments upto $endOffset.")

      if (startOffset == maxOffsetMetadata.messageOffset)
        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      else if (startOffset > maxOffsetMetadata.messageOffset)
        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
      else {
        // Do the read on the segment with a base offset less than the target offset
        // but if that segment doesn't contain any messages with an offset greater than that
        // continue to read from successive segments until we get some messages or we reach the end of the log
        var fetchDataInfo: FetchDataInfo = null
        while (fetchDataInfo == null && segmentOpt.isDefined) {
          val segment = segmentOpt.get
          val baseOffset = segment.baseOffset

          val maxPosition =
          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
            else segment.size

          fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
          if (fetchDataInfo != null) {
            if (includeAbortedTxns)
              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
          } else segmentOpt = segments.higherSegment(baseOffset)
        }

        if (fetchDataInfo != null) fetchDataInfo
        else {
          // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
          // this can happen when all messages with offset larger than start offsets have been deleted.
          // In this case, we will return the empty set with log end offset metadata
          FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
        }
      }
    }
  }

  private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
      shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
    updateLogEndOffset(lastOffset + 1)
  }

  private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
                                     fetchInfo: FetchDataInfo): FetchDataInfo = {
    val fetchSize = fetchInfo.records.sizeInBytes
    val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
      fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
    val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
      segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset)
    }

    val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
    def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
    collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)

    FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
      records = fetchInfo.records,
      firstEntryIncomplete = fetchInfo.firstEntryIncomplete,
      abortedTransactions = Some(abortedTransactions.toList))
  }

  private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
                                         startingSegment: LogSegment,
                                         accumulator: List[AbortedTxn] => Unit): Unit = {
    val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
    var segmentEntryOpt = Option(startingSegment)
    while (segmentEntryOpt.isDefined) {
      val segment = segmentEntryOpt.get
      val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
      accumulator(searchResult.abortedTransactions)
      if (searchResult.isComplete)
        return
      segmentEntryOpt = nextOption(higherSegments)
    }
  }

  private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
    val segmentEntry = segments.floorSegment(baseOffset)
    val allAbortedTxns = ListBuffer.empty[AbortedTxn]
    def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
    segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
    allAbortedTxns.toList
  }

  /**
   * Roll the log over to a new active segment starting with the current logEndOffset.
   * This will trim the index to the exact size of the number of entries it currently contains.
   *
   * @param expectedNextOffset The expected next offset after the segment is rolled
   *
   * @return The newly rolled segment
   */
  private[log] def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
    maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
      val start = time.hiResClockMs()
      checkIfMemoryMappedBufferClosed()
      val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
      val logFile = LocalLog.logFile(dir, newOffset)
      val activeSegment = segments.activeSegment
      if (segments.contains(newOffset)) {
        // segment with the same base offset already exists and loaded
        if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
          // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
          // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
          warn(s"Trying to roll a new log segment with start offset $newOffset " +
            s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
            s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
            s" size of offset index: ${activeSegment.offsetIndex.entries}.")
          val newSegment = createAndDeleteSegment(newOffset, activeSegment, asyncDelete = true, LogRoll(this))
          updateLogEndOffset(nextOffsetMetadata.messageOffset)
          info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
          return newSegment
        } else {
          throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
            s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
            s"segment is ${segments.get(newOffset)}.")
        }
      } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
        throw new KafkaException(
          s"Trying to roll a new log segment for topic partition $topicPartition with " +
            s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
      } else {
        val offsetIdxFile = offsetIndexFile(dir, newOffset)
        val timeIdxFile = timeIndexFile(dir, newOffset)
        val txnIdxFile = transactionIndexFile(dir, newOffset)

        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
          warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
          Files.delete(file.toPath)
        }

        segments.lastSegment.foreach(_.onBecomeInactiveSegment())
      }

      val newSegment = LogSegment.open(dir,
        baseOffset = newOffset,
        config,
        time = time,
        initFileSize = config.initFileSize,
        preallocate = config.preallocate)
      segments.add(newSegment)

      // We need to update the segment base offset and append position data of the metadata when log rolls.
      // The next offset should not change.
      updateLogEndOffset(nextOffsetMetadata.messageOffset)

      info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")

      newSegment
    }
  }

  /**
   *  Delete all data in the local log and start at the new offset.
   *
   *  @param newOffset The new offset to start the log with
   *  @return the list of segments that were scheduled for deletion
   */
  private[log] def truncateFullyAndStartAt(newOffset: Long): Iterable[LogSegment] = {
    maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
      debug(s"Truncate and start at offset $newOffset")
      checkIfMemoryMappedBufferClosed()
      val segmentsToDelete = List[LogSegment]() ++ segments.values

      if (segmentsToDelete.nonEmpty) {
        removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete = true, LogTruncation(this))
        // Use createAndDeleteSegment() to create new segment first and then delete the old last segment to prevent missing
        // active segment during the deletion process
        createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete = true, LogTruncation(this))
      }

      updateLogEndOffset(newOffset)

      segmentsToDelete
    }
  }

  /**
   * Truncate this log so that it ends with the greatest offset < targetOffset.
   *
   * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
   * @return the list of segments that were scheduled for deletion
   */
  private[log] def truncateTo(targetOffset: Long): Iterable[LogSegment] = {
    val deletableSegments = List[LogSegment]() ++ segments.filter(segment => segment.baseOffset > targetOffset)
    removeAndDeleteSegments(deletableSegments, asyncDelete = true, LogTruncation(this))
    segments.activeSegment.truncateTo(targetOffset)
    updateLogEndOffset(targetOffset)
    deletableSegments
  }
}

/**
 * Helper functions for logs
 */
object LocalLog extends Logging {

  /** a log file */
  private[log] val LogFileSuffix = ".log"

  /** an index file */
  private[log] val IndexFileSuffix = ".index"

  /** a time index file */
  private[log] val TimeIndexFileSuffix = ".timeindex"

  /** an (aborted) txn index */
  private[log] val TxnIndexFileSuffix = ".txnindex"

  /** a file that is scheduled to be deleted */
  private[log] val DeletedFileSuffix = ".deleted"

  /** A temporary file that is being used for log cleaning */
  private[log] val CleanedFileSuffix = ".cleaned"

  /** A temporary file used when swapping files into the log */
  private[log] val SwapFileSuffix = ".swap"

  /** a directory that is scheduled to be deleted */
  private[log] val DeleteDirSuffix = "-delete"

  /** a directory that is used for future partition */
  private[log] val FutureDirSuffix = "-future"

  private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
  private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")

  private[log] val UnknownOffset = -1L

  /**
   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
   * so that ls sorts the files numerically.
   *
   * @param offset The offset to use in the file name
   * @return The filename
   */
  private[log] def filenamePrefixFromOffset(offset: Long): String = {
    val nf = NumberFormat.getInstance()
    nf.setMinimumIntegerDigits(20)
    nf.setMaximumFractionDigits(0)
    nf.setGroupingUsed(false)
    nf.format(offset)
  }

  /**
   * Construct a log file name in the given dir with the given base offset and the given suffix
   *
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   * @param suffix The suffix to be appended to the file name (e.g. "", ".deleted", ".cleaned", ".swap", etc.)
   */
  private[log] def logFile(dir: File, offset: Long, suffix: String = ""): File =
    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix)

  /**
   * Return a directory name to rename the log directory to for async deletion.
   * The name will be in the following format: "topic-partitionId.uniqueId-delete".
   * If the topic name is too long, it will be truncated to prevent the total name
   * from exceeding 255 characters.
   */
  private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
    val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
    val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
    s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
  }

  /**
   * Return a future directory name for the given topic partition. The name will be in the following
   * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables.
   */
  private[log] def logFutureDirName(topicPartition: TopicPartition): String = {
    logDirNameWithSuffix(topicPartition, FutureDirSuffix)
  }

  private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
    s"${logDirName(topicPartition)}.$uniqueId$suffix"
  }

  /**
   * Return a directory name for the given topic partition. The name will be in the following
   * format: topic-partition where topic, partition are variables.
   */
  private[log] def logDirName(topicPartition: TopicPartition): String = {
    s"${topicPartition.topic}-${topicPartition.partition}"
  }

  /**
   * Construct an index file name in the given dir using the given base offset and the given suffix
   *
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
   */
  private[log] def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File =
    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix + suffix)

  /**
   * Construct a time index file name in the given dir using the given base offset and the given suffix
   *
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
   */
  private[log] def timeIndexFile(dir: File, offset: Long, suffix: String = ""): File =
    new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix + suffix)

  /**
   * Construct a transaction index file name in the given dir using the given base offset and the given suffix
   *
   * @param dir The directory in which the log will reside
   * @param offset The base offset of the log file
   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
   */
  private[log] def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File =
    new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix + suffix)

  private[log] def offsetFromFileName(filename: String): Long = {
    filename.substring(0, filename.indexOf('.')).toLong
  }

  private[log] def offsetFromFile(file: File): Long = {
    offsetFromFileName(file.getName)
  }

  /**
   * Parse the topic and partition out of the directory name of a log
   */
  private[log] def parseTopicPartitionName(dir: File): TopicPartition = {
    if (dir == null)
      throw new KafkaException("dir should not be null")

    def exception(dir: File): KafkaException = {
      new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " +
        "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
        "Kafka's log directories (and children) should only contain Kafka topic data.")
    }

    val dirName = dir.getName
    if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
      throw exception(dir)
    if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
      dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches)
      throw exception(dir)

    val name: String =
      if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
      else dirName

    val index = name.lastIndexOf('-')
    val topic = name.substring(0, index)
    val partitionString = name.substring(index + 1)
    if (topic.isEmpty || partitionString.isEmpty)
      throw exception(dir)

    val partition =
      try partitionString.toInt
      catch { case _: NumberFormatException => throw exception(dir) }

    new TopicPartition(topic, partition)
  }

  private[log] def isIndexFile(file: File): Boolean = {
    val filename = file.getName
    filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix)
  }

  private[log] def isLogFile(file: File): Boolean =
    file.getPath.endsWith(LogFileSuffix)

  /**
   * Invokes the provided function and handles any IOException raised by the function by marking the
   * provided directory offline.
   *
   * @param logDirFailureChannel Used to asynchronously handle log directory failure.
   * @param logDir The log directory to be marked offline during an IOException.
   * @param errorMsg The error message to be used when marking the log directory offline.
   * @param fun The function to be executed.
   * @return The value returned by the function after a successful invocation
   */
  private[log] def maybeHandleIOException[T](logDirFailureChannel: LogDirFailureChannel,
                                             logDir: String,
                                             errorMsg: => String)(fun: => T): T = {
    if (logDirFailureChannel.hasOfflineLogDir(logDir)) {
      throw new KafkaStorageException(s"The log dir $logDir is already offline due to a previous IO exception.")
    }
    try {
      fun
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(logDir, errorMsg, e)
        throw new KafkaStorageException(errorMsg, e)
    }
  }

  /**
   * Split a segment into one or more segments such that there is no offset overflow in any of them. The
   * resulting segments will contain the exact same messages that are present in the input segment. On successful
   * completion of this method, the input segment will be deleted and will be replaced by the resulting new segments.
   * See replaceSegments for recovery logic, in case the broker dies in the middle of this operation.
   *
   * Note that this method assumes we have already determined that the segment passed in contains records that cause
   * offset overflow.
   *
   * The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing
   * the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments
   * and completeSwapOperations for the implementation to make this operation recoverable on crashes.</p>
   *
   * @param segment Segment to split
   * @param existingSegments The existing segments of the log
   * @param dir The directory in which the log will reside
   * @param topicPartition The topic
   * @param config The log configuration settings
   * @param scheduler The thread pool scheduler used for background actions
   * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
   * @param logPrefix The logging prefix
   * @return List of new segments that replace the input segment
   */
  private[log] def splitOverflowedSegment(segment: LogSegment,
                                          existingSegments: LogSegments,
                                          dir: File,
                                          topicPartition: TopicPartition,
                                          config: LogConfig,
                                          scheduler: Scheduler,
                                          logDirFailureChannel: LogDirFailureChannel,
                                          logPrefix: String): SplitSegmentResult = {
    require(isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}")
    require(segment.hasOverflow, s"Split operation is only permitted for segments with overflow, and the problem path is ${segment.log.file.getAbsoluteFile}")

    info(s"${logPrefix}Splitting overflowed segment $segment")

    val newSegments = ListBuffer[LogSegment]()
    try {
      var position = 0
      val sourceRecords = segment.log

      while (position < sourceRecords.sizeInBytes) {
        val firstBatch = sourceRecords.batchesFrom(position).asScala.head
        val newSegment = createNewCleanedSegment(dir, config, firstBatch.baseOffset)
        newSegments += newSegment

        val bytesAppended = newSegment.appendFromFile(sourceRecords, position)
        if (bytesAppended == 0)
          throw new IllegalStateException(s"Failed to append records from position $position in $segment")

        position += bytesAppended
      }

      // prepare new segments
      var totalSizeOfNewSegments = 0
      newSegments.foreach { splitSegment =>
        splitSegment.onBecomeInactiveSegment()
        splitSegment.flush()
        splitSegment.lastModified = segment.lastModified
        totalSizeOfNewSegments += splitSegment.log.sizeInBytes
      }
      // size of all the new segments combined must equal size of the original segment
      if (totalSizeOfNewSegments != segment.log.sizeInBytes)
        throw new IllegalStateException("Inconsistent segment sizes after split" +
          s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments")

      // replace old segment with new ones
      info(s"${logPrefix}Replacing overflowed segment $segment with split segments $newSegments")
      val newSegmentsToAdd = newSegments.toSeq
      val deletedSegments = LocalLog.replaceSegments(existingSegments, newSegmentsToAdd, List(segment),
        dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
      SplitSegmentResult(deletedSegments.toSeq, newSegmentsToAdd)
    } catch {
      case e: Exception =>
        newSegments.foreach { splitSegment =>
          splitSegment.close()
          splitSegment.deleteIfExists()
        }
        throw e
    }
  }

  /**
   * Swap one or more new segment in place and delete one or more existing segments in a crash-safe
   * manner. The old segments will be asynchronously deleted.
   *
   * This method does not need to convert IOException to KafkaStorageException because it is either
   * called before all logs are loaded or the caller will catch and handle IOException
   *
   * The sequence of operations is:
   *
   * - Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments() on
   *   the Log instance. If broker crashes at this point, the clean-and-swap operation is aborted and
   *   the .cleaned files are deleted on recovery in LogLoader.
   * - New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the
   *   clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in
   *   in LogLoader. We detect this situation by maintaining a specific order in which files are renamed
   *   from .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery,
   *   all .swap files whose offset is greater than the minimum-offset .clean file are deleted.
   * - If the broker crashes after all new segments were renamed to .swap, the operation is completed,
   *   the swap operation is resumed on recovery as described in the next step.
   * - Old segment files are renamed to .deleted and asynchronous delete is scheduled. If the broker
   *   crashes, any .deleted files left behind are deleted on recovery in LogLoader.
   *   replaceSegments() is then invoked to complete the swap with newSegment recreated from the
   *   .swap file and oldSegments containing segments which were not renamed before the crash.
   * - Swap segment(s) are renamed to replace the existing segments, completing this operation.
   *   If the broker crashes, any .deleted files which may be left behind are deleted
   *   on recovery in LogLoader.
   *
   * @param existingSegments The existing segments of the log
   * @param newSegments The new log segment to add to the log
   * @param oldSegments The old log segments to delete from the log
   * @param dir The directory in which the log will reside
   * @param topicPartition The topic
   * @param config The log configuration settings
   * @param scheduler The thread pool scheduler used for background actions
   * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
   * @param logPrefix The logging prefix
   * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
   */
  private[log] def replaceSegments(existingSegments: LogSegments,
                                   newSegments: Seq[LogSegment],
                                   oldSegments: Seq[LogSegment],
                                   dir: File,
                                   topicPartition: TopicPartition,
                                   config: LogConfig,
                                   scheduler: Scheduler,
                                   logDirFailureChannel: LogDirFailureChannel,
                                   logPrefix: String,
                                   isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
    val sortedNewSegments = newSegments.sortBy(_.baseOffset)
    // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
    // but before this method is executed. We want to filter out those segments to avoid calling deleteSegmentFiles()
    // multiple times for the same segment.
    val sortedOldSegments = oldSegments.filter(seg => existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset)

    // need to do this in two phases to be crash safe AND do the delete asynchronously
    // if we crash in the middle of this we complete the swap in loadSegments()
    if (!isRecoveredSwapFile)
      sortedNewSegments.reverse.foreach(_.changeFileSuffixes(CleanedFileSuffix, SwapFileSuffix))
    sortedNewSegments.reverse.foreach(existingSegments.add)
    val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

    // delete the old files
    val deletedNotReplaced = sortedOldSegments.map { seg =>
      // remove the index entry
      if (seg.baseOffset != sortedNewSegments.head.baseOffset)
        existingSegments.remove(seg.baseOffset)
      deleteSegmentFiles(
        List(seg),
        asyncDelete = true,
        dir,
        topicPartition,
        config,
        scheduler,
        logDirFailureChannel,
        logPrefix)
      if (newSegmentBaseOffsets.contains(seg.baseOffset)) Option.empty else Some(seg)
    }.filter(item => item.isDefined).map(item => item.get)

    // okay we are safe now, remove the swap suffix
    sortedNewSegments.foreach(_.changeFileSuffixes(SwapFileSuffix, ""))
    Utils.flushDir(dir.toPath)
    deletedNotReplaced
  }

  /**
   * Perform physical deletion of the index and log files for the given segment.
   * Prior to the deletion, the index and log files are renamed by appending .deleted to the
   * respective file name. Allows these files to be optionally deleted asynchronously.
   *
   * This method assumes that the file exists. It does not need to convert IOException
   * (thrown from changeFileSuffixes) to KafkaStorageException because it is either called before
   * all logs are loaded or the caller will catch and handle IOException.
   *
   * @param segmentsToDelete The segments to be deleted
   * @param asyncDelete If true, the deletion of the segments is done asynchronously
   * @param dir The directory in which the log will reside
   * @param topicPartition The topic
   * @param config The log configuration settings
   * @param scheduler The thread pool scheduler used for background actions
   * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
   * @param logPrefix The logging prefix
   * @throws IOException if the file can't be renamed and still exists
   */
  private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
                                      asyncDelete: Boolean,
                                      dir: File,
                                      topicPartition: TopicPartition,
                                      config: LogConfig,
                                      scheduler: Scheduler,
                                      logDirFailureChannel: LogDirFailureChannel,
                                      logPrefix: String): Unit = {
    segmentsToDelete.foreach { segment =>
      if (!segment.hasSuffix(DeletedFileSuffix))
        segment.changeFileSuffixes("", DeletedFileSuffix)
    }

    def deleteSegments(): Unit = {
      info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
      val parentDir = dir.getParent
      maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
        segmentsToDelete.foreach { segment =>
          segment.deleteIfExists()
        }
      }
    }

    if (asyncDelete)
      scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
    else
      deleteSegments()
  }

  private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
                                      includeAbortedTxns: Boolean): FetchDataInfo = {
    val abortedTransactions =
      if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction])
      else None
    FetchDataInfo(fetchOffsetMetadata,
      MemoryRecords.EMPTY,
      abortedTransactions = abortedTransactions)
  }

  private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
    LogSegment.deleteIfExists(dir, baseOffset, fileSuffix = CleanedFileSuffix)
    LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM,
      fileSuffix = CleanedFileSuffix, initFileSize = logConfig.initFileSize, preallocate = logConfig.preallocate)
  }

  /**
   * Wraps the value of iterator.next() in an option.
   * Note: this facility is a part of the Iterator class starting from scala v2.13.
   *
   * @param iterator
   * @tparam T the type of object held within the iterator
   * @return Some(iterator.next) if a next element exists, None otherwise.
   */
  private def nextOption[T](iterator: Iterator[T]): Option[T] = {
    if (iterator.hasNext)
      Some(iterator.next())
    else
      None
  }
}

trait SegmentDeletionReason {
  def logReason(toDelete: List[LogSegment]): Unit
}

case class LogTruncation(log: LocalLog) extends SegmentDeletionReason {
  override def logReason(toDelete: List[LogSegment]): Unit = {
    log.info(s"Deleting segments as part of log truncation: ${toDelete.mkString(",")}")
  }
}

case class LogRoll(log: LocalLog) extends SegmentDeletionReason {
  override def logReason(toDelete: List[LogSegment]): Unit = {
    log.info(s"Deleting segments as part of log roll: ${toDelete.mkString(",")}")
  }
}

case class LogDeletion(log: LocalLog) extends SegmentDeletionReason {
  override def logReason(toDelete: List[LogSegment]): Unit = {
    log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
  }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractIndex 源码

kafka CleanerConfig 源码

kafka CorruptIndexException 源码

kafka IndexEntry 源码

kafka LazyIndex 源码

kafka LogCleaner 源码

kafka LogCleanerManager 源码

kafka LogConfig 源码

kafka LogLoader 源码

kafka LogManager 源码

0  赞