kafka BrokerMetadataListener 源码
kafka BrokerMetadataListener 代码
文件路径:/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.snapshot.SnapshotReader
import java.util.concurrent.atomic.AtomicBoolean
object BrokerMetadataListener {
val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs"
val MetadataBatchSizes = "MetadataBatchSizes"
}
class BrokerMetadataListener(
val brokerId: Int,
time: Time,
threadNamePrefix: Option[String],
val maxBytesBetweenSnapshots: Long,
val snapshotter: Option[MetadataSnapshotter],
brokerMetrics: BrokerServerMetrics,
_metadataLoadingFaultHandler: FaultHandler
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val metadataFaultOccurred = new AtomicBoolean(false)
private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
// If the broker has any kind of error handling metadata records or publishing a new image
// we will disable taking new snapshots in order to preserve the local metadata log. Once we
// encounter a metadata processing error, the broker will be in an undetermined state.
if (metadataFaultOccurred.compareAndSet(false, true)) {
error("Disabling metadata snapshots until this broker is restarted.")
}
_metadataLoadingFaultHandler.handleFault(failureMessage, cause)
}
}
private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
/**
* A histogram tracking the time in microseconds it took to process batches of events.
*/
private val batchProcessingTimeHist = newHistogram(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
/**
* A histogram tracking the sizes of batches that we have processed.
*/
private val metadataBatchSizeHist = newHistogram(BrokerMetadataListener.MetadataBatchSizes)
/**
* The highest metadata offset that we've seen. Written only from the event queue thread.
*/
@volatile var _highestOffset = -1L
/**
* The highest metadata log time that we've seen. Written only from the event queue thread.
*/
private var _highestTimestamp = -1L
/**
* The current broker metadata image. Accessed only from the event queue thread.
*/
private var _image = MetadataImage.EMPTY
/**
* The current metadata delta. Accessed only from the event queue thread.
*/
private var _delta = new MetadataDelta(_image)
/**
* The object to use to publish new metadata changes, or None if this listener has not
* been activated yet. Accessed only from the event queue thread.
*/
private var _publisher: Option[MetadataPublisher] = None
/**
* The number of bytes of records that we have read since the last snapshot we took.
* This does not include records we read from a snapshot.
* Accessed only from the event queue thread.
*/
private var _bytesSinceLastSnapshot: Long = 0L
/**
* The event queue which runs this listener.
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
/**
* Returns the highest metadata-offset. Thread-safe.
*/
def highestMetadataOffset: Long = _highestOffset
/**
* Handle new metadata records.
*/
override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
eventQueue.append(new HandleCommitsEvent(reader))
class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val results = try {
val loadResults = loadBatches(_delta, reader, None, None, None, None)
if (isDebugEnabled) {
debug(s"Loaded new commits: $loadResults")
}
loadResults
} catch {
case e: Throwable =>
metadataLoadingFaultHandler.handleFault(s"Unable to load metadata commits " +
s"from the BatchReader starting at base offset ${reader.baseOffset()}", e)
return
} finally {
reader.close()
}
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
if (shouldTakeSnapshot.nonEmpty) {
maybeStartSnapshot(shouldTakeSnapshot)
}
_publisher.foreach(publish)
}
}
private def shouldSnapshot(): Set[SnapshotReason] = {
val metadataVersionHasChanged = metadataVersionChanged()
val maxBytesHaveExceeded = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots)
if (maxBytesHaveExceeded && metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged, SnapshotReason.MaxBytesExceeded)
} else if (maxBytesHaveExceeded) {
Set(SnapshotReason.MaxBytesExceeded)
} else if (metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged)
} else {
Set()
}
}
private def metadataVersionChanged(): Boolean = {
// The _publisher is empty before starting publishing, and we won't compute feature delta
// until we starting publishing
_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta =>
featuresDelta.metadataVersionChange().isPresent
}
}
private def maybeStartSnapshot(reason: Set[SnapshotReason]): Unit = {
snapshotter.foreach { snapshotter =>
if (metadataFaultOccurred.get()) {
trace("Not starting metadata snapshot since we previously had an error")
} else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply(), reason)) {
_bytesSinceLastSnapshot = 0L
}
}
}
/**
* Handle metadata snapshots
*/
override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit =
eventQueue.append(new HandleSnapshotEvent(reader))
class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val snapshotName = s"${reader.snapshotId().offset}-${reader.snapshotId().epoch}"
try {
info(s"Loading snapshot ${snapshotName}")
_delta = new MetadataDelta(_image) // Discard any previous deltas.
val loadResults = loadBatches(_delta,
reader,
Some(reader.lastContainedLogTimestamp),
Some(reader.lastContainedLogOffset),
Some(reader.lastContainedLogEpoch),
Some(snapshotName))
try {
_delta.finishSnapshot()
} catch {
case e: Throwable => metadataLoadingFaultHandler.handleFault(
s"Error finishing snapshot ${snapshotName}", e)
}
info(s"Loaded snapshot ${snapshotName}: ${loadResults}")
} catch {
case t: Throwable => metadataLoadingFaultHandler.handleFault("Uncaught exception while " +
s"loading broker metadata from Metadata snapshot ${snapshotName}", t)
} finally {
reader.close()
}
_publisher.foreach(publish)
}
}
case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs: Long, numBytes: Long) {
override def toString: String = {
s"$numBatches batch(es) with $numRecords record(s) in $numBytes bytes " +
s"ending at offset $highestMetadataOffset in $elapsedUs microseconds"
}
}
/**
* Load and replay the batches to the metadata delta.
*
* When loading and replay a snapshot the appendTimestamp and snapshotId parameter should be provided.
* In a snapshot the append timestamp, offset and epoch reported by the batch is independent of the ones
* reported by the metadata log.
*
* @param delta metadata delta on which to replay the records
* @param iterator sequence of metadata record bacthes to replay
* @param lastAppendTimestamp optional append timestamp to use instead of the batches timestamp
* @param lastCommittedOffset optional offset to use instead of the batches offset
* @param lastCommittedEpoch optional epoch to use instead of the batches epoch
*/
private def loadBatches(
delta: MetadataDelta,
iterator: util.Iterator[Batch[ApiMessageAndVersion]],
lastAppendTimestamp: Option[Long],
lastCommittedOffset: Option[Long],
lastCommittedEpoch: Option[Int],
snapshotName: Option[String]
): BatchLoadResults = {
val startTimeNs = time.nanoseconds()
var numBatches = 0
var numRecords = 0
var numBytes = 0L
while (iterator.hasNext) {
val batch = iterator.next()
val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
_highestTimestamp = lastAppendTimestamp.getOrElse(batch.appendTimestamp())
var index = 0
batch.records().forEach { messageAndVersion =>
if (isTraceEnabled) {
trace(s"Metadata batch ${batch.lastOffset}: processing [${index + 1}/${batch.records.size}]:" +
s" ${messageAndVersion.message}")
}
_highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
try {
delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
} catch {
case e: Throwable => snapshotName match {
case None => metadataLoadingFaultHandler.handleFault(
s"Error replaying metadata log record at offset ${_highestOffset}", e)
case Some(name) => metadataLoadingFaultHandler.handleFault(
s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
}
} finally {
numRecords += 1
index += 1
}
}
numBytes = numBytes + batch.sizeInBytes()
metadataBatchSizeHist.update(batch.records().size())
numBatches = numBatches + 1
}
val endTimeNs = time.nanoseconds()
val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS)
batchProcessingTimeHist.update(elapsedUs)
BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes)
}
def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = {
val event = new StartPublishingEvent(publisher)
eventQueue.append(event)
event.future
}
class StartPublishingEvent(publisher: MetadataPublisher)
extends EventQueue.FailureLoggingEvent(log) {
val future = new CompletableFuture[Void]()
override def run(): Unit = {
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
try {
if (metadataVersionChanged()) {
maybeStartSnapshot(Set(SnapshotReason.MetadataVersionChanged))
}
publish(publisher)
future.complete(null)
} catch {
case e: Throwable =>
future.completeExceptionally(e)
throw e
}
}
}
// This is used in tests to alter the publisher that is in use by the broker.
def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
val event = new AlterPublisherEvent(publisher)
eventQueue.append(event)
event.future
}
class AlterPublisherEvent(publisher: MetadataPublisher)
extends EventQueue.FailureLoggingEvent(log) {
val future = new CompletableFuture[Void]()
override def run(): Unit = {
_publisher = Some(publisher)
log.info(s"Set publisher to ${publisher}")
future.complete(null)
}
}
private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
try {
_image = _delta.apply()
} catch {
case t: Throwable =>
// If we cannot apply the delta, this publish event will throw and we will not publish a new image.
// The broker will continue applying metadata records and attempt to publish again.
throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t)
}
_delta = new MetadataDelta(_image)
if (isDebugEnabled) {
debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.")
}
// This publish call is done with its own try-catch and fault handler
publisher.publish(delta, _image)
// Update the metrics since the publisher handled the lastest image
brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
}
override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
// Nothing to do.
}
override def beginShutdown(): Unit = {
eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
}
class ShutdownEvent extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
brokerMetrics.close()
removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
removeMetric(BrokerMetadataListener.MetadataBatchSizes)
}
}
def close(): Unit = {
beginShutdown()
eventQueue.close()
}
// VisibleForTesting
private[kafka] def getImageRecords(): CompletableFuture[util.List[ApiMessageAndVersion]] = {
val future = new CompletableFuture[util.List[ApiMessageAndVersion]]()
eventQueue.append(new GetImageRecordsEvent(future))
future
}
class GetImageRecordsEvent(future: CompletableFuture[util.List[ApiMessageAndVersion]])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val writer = new RecordListWriter()
val options = new ImageWriterOptions.Builder().
setMetadataVersion(_image.features().metadataVersion()).
build()
try {
_image.write(writer, options)
} finally {
writer.close()
}
future.complete(writer.records())
}
}
}
相关信息
相关文章
kafka BrokerMetadataPublisher 源码
kafka BrokerMetadataSnapshotter 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦