kafka StorageTool 源码

  • 2022-10-20
  • 浏览 (295)

kafka StorageTool 代码

文件路径:/core/src/main/scala/kafka/tools/StorageTool.scala

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.tools

import java.io.PrintStream
import java.nio.file.{Files, Paths}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.server.common.MetadataVersion

import java.util.Optional
import scala.collection.mutable

object StorageTool extends Logging {
  def main(args: Array[String]): Unit = {
    try {
      val namespace = parseArguments(args)
      val command = namespace.getString("command")
      val config = Option(namespace.getString("config")).flatMap(
        p => Some(new KafkaConfig(Utils.loadProps(p))))
      command match {
        case "info" =>
          val directories = configToLogDirectories(config.get)
          val selfManagedMode = configToSelfManagedMode(config.get)
          Exit.exit(infoCommand(System.out, selfManagedMode, directories))

        case "format" =>
          val directories = configToLogDirectories(config.get)
          val clusterId = namespace.getString("cluster_id")
          val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString))
          if (!metadataVersion.isKRaftSupported) {
            throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
          }
          val metaProperties = buildMetadataProperties(clusterId, config.get)
          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
          if (!configToSelfManagedMode(config.get)) {
            throw new TerseFailure("The kafka configuration file appears to be for " +
              "a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
          }
          Exit.exit(formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted))

        case "random-uuid" =>
          System.out.println(Uuid.randomUuid)
          Exit.exit(0)

        case _ =>
          throw new RuntimeException(s"Unknown command $command")
      }
    } catch {
      case e: TerseFailure =>
        System.err.println(e.getMessage)
        System.exit(1)
    }
  }

  def parseArguments(args: Array[String]): Namespace = {
    val parser = ArgumentParsers.
      newArgumentParser("kafka-storage").
      defaultHelp(true).
      description("The Kafka storage tool.")
    val subparsers = parser.addSubparsers().dest("command")

    val infoParser = subparsers.addParser("info").
      help("Get information about the Kafka log directories on this node.")
    val formatParser = subparsers.addParser("format").
      help("Format the Kafka log directories on this node.")
    subparsers.addParser("random-uuid").help("Print a random UUID.")
    List(infoParser, formatParser).foreach(parser => {
      parser.addArgument("--config", "-c").
        action(store()).
        required(true).
        help("The Kafka configuration file to use.")
    })
    formatParser.addArgument("--cluster-id", "-t").
      action(store()).
      required(true).
      help("The cluster ID to use.")
    formatParser.addArgument("--ignore-formatted", "-g").
      action(storeTrue())
    formatParser.addArgument("--release-version", "-r").
      action(store()).
      help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")

    parser.parseArgsOrFail(args)
  }

  def configToLogDirectories(config: KafkaConfig): Seq[String] = {
    val directories = new mutable.TreeSet[String]
    directories ++= config.logDirs
    Option(config.metadataLogDir).foreach(directories.add)
    directories.toSeq
  }

  def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty

  def getMetadataVersion(
    namespace: Namespace,
    defaultVersionString: Option[String]
  ): MetadataVersion = {
    val defaultValue = defaultVersionString match {
      case Some(versionString) => MetadataVersion.fromVersionString(versionString)
      case None => MetadataVersion.latest()
    }

    Option(namespace.getString("release_version"))
      .map(ver => MetadataVersion.fromVersionString(ver))
      .getOrElse(defaultValue)
  }

  def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: Seq[String]): Int = {
    val problems = new mutable.ArrayBuffer[String]
    val foundDirectories = new mutable.ArrayBuffer[String]
    var prevMetadata: Option[RawMetaProperties] = None
    directories.sorted.foreach(directory => {
      val directoryPath = Paths.get(directory)
      if (!Files.isDirectory(directoryPath)) {
        if (!Files.exists(directoryPath)) {
          problems += s"$directoryPath does not exist"
        } else {
          problems += s"$directoryPath is not a directory"
        }
      } else {
        foundDirectories += directoryPath.toString
        val metaPath = directoryPath.resolve("meta.properties")
        if (!Files.exists(metaPath)) {
          problems += s"$directoryPath is not formatted."
        } else {
          val properties = Utils.loadProps(metaPath.toString)
          val rawMetaProperties = new RawMetaProperties(properties)

          val curMetadata = rawMetaProperties.version match {
            case 0 | 1 => Some(rawMetaProperties)
            case v =>
              problems += s"Unsupported version for $metaPath: $v"
              None
          }

          if (prevMetadata.isEmpty) {
            prevMetadata = curMetadata
          } else {
            if (!prevMetadata.get.equals(curMetadata.get)) {
              problems += s"Metadata for $metaPath was ${curMetadata.get}, " +
                s"but other directories featured ${prevMetadata.get}"
            }
          }
        }
      }
    })

    prevMetadata.foreach { prev =>
      if (selfManagedMode) {
        if (prev.version == 0) {
          problems += "The kafka configuration file appears to be for a cluster in KRaft mode, but " +
            "the directories are formatted for legacy mode."
        }
      } else if (prev.version == 1) {
        problems += "The kafka configuration file appears to be for a legacy cluster, but " +
          "the directories are formatted for a cluster in KRaft mode."
      }
    }

    if (directories.isEmpty) {
      stream.println("No directories specified.")
      0
    } else {
      if (foundDirectories.nonEmpty) {
        if (foundDirectories.size == 1) {
          stream.println("Found log directory:")
        } else {
          stream.println("Found log directories:")
        }
        foundDirectories.foreach(d => stream.println("  %s".format(d)))
        stream.println("")
      }

      prevMetadata.foreach { prev =>
        stream.println(s"Found metadata: ${prev}")
        stream.println("")
      }

      if (problems.nonEmpty) {
        if (problems.size == 1) {
          stream.println("Found problem:")
        } else {
          stream.println("Found problems:")
        }
        problems.foreach(d => stream.println("  %s".format(d)))
        stream.println("")
        1
      } else {
        0
      }
    }
  }

  def buildMetadataProperties(
    clusterIdStr: String,
    config: KafkaConfig
  ): MetaProperties = {
    val effectiveClusterId = try {
      Uuid.fromString(clusterIdStr)
    } catch {
      case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
        s"does not appear to be a valid UUID: ${e.getMessage}")
    }
    if (config.nodeId < 0) {
      throw new TerseFailure(s"The node.id must be set to a non-negative integer. We saw ${config.nodeId}")
    }
    new MetaProperties(effectiveClusterId.toString, config.nodeId)
  }

  def formatCommand(stream: PrintStream,
                    directories: Seq[String],
                    metaProperties: MetaProperties,
                    metadataVersion: MetadataVersion,
                    ignoreFormatted: Boolean): Int = {
    if (directories.isEmpty) {
      throw new TerseFailure("No log directories found in the configuration.")
    }
    val unformattedDirectories = directories.filter(directory => {
      if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) {
          true
      } else if (!ignoreFormatted) {
        throw new TerseFailure(s"Log directory $directory is already formatted. " +
          "Use --ignore-formatted to ignore this directory and format the others.")
      } else {
        false
      }
    })
    if (unformattedDirectories.isEmpty) {
      stream.println("All of the log directories are already formatted.")
    }
    unformattedDirectories.foreach(directory => {
      try {
        Files.createDirectories(Paths.get(directory))
      } catch {
        case e: Throwable => throw new TerseFailure(s"Unable to create storage " +
          s"directory $directory: ${e.getMessage}")
      }
      val metaPropertiesPath = Paths.get(directory, "meta.properties")
      val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
      checkpoint.write(metaProperties.toProperties)

      val bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "format command")
      val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
      bootstrapDirectory.writeBinaryFile(bootstrapMetadata)

      stream.println(s"Formatting ${directory} with metadata.version ${metadataVersion}.")
    })
    0
  }
}

相关信息

kafka 源码目录

相关文章

kafka ClusterTool 源码

kafka ConsoleConsumer 源码

kafka ConsoleProducer 源码

kafka ConsumerPerformance 源码

kafka DumpLogSegments 源码

kafka EndToEndLatency 源码

kafka GetOffsetShell 源码

kafka JmxTool 源码

kafka MirrorMaker 源码

kafka PerfConfig 源码

0  赞