kafka FeatureCommand 源码

  • 2022-10-20
  • 浏览 (213)

kafka FeatureCommand 代码

文件路径:/core/src/main/scala/kafka/admin/FeatureCommand.scala

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.admin

import kafka.tools.TerseFailure
import kafka.utils.Exit
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, storeTrue}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparsers}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion

import java.io.{File, PrintStream}
import java.util.Properties
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._

object FeatureCommand {
  def main(args: Array[String]): Unit = {
    val res = mainNoExit(args, System.out)
    Exit.exit(res)
  }

  // This is used for integration tests in order to avoid killing the test with Exit.exit,
  // and in order to capture the command output.
  def mainNoExit(
    args: Array[String],
    out: PrintStream
  ): Int = {
    val parser = ArgumentParsers.newArgumentParser("kafka-features")
      .defaultHelp(true)
      .description("This tool manages feature flags in Kafka.")
    parser.addArgument("--bootstrap-server")
      .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
      .required(true)

    parser.addArgument("--command-config")
      .`type`(fileType())
      .help("Property file containing configs to be passed to Admin Client.")
    val subparsers = parser.addSubparsers().dest("command")
    addDescribeParser(subparsers)
    addUpgradeParser(subparsers)
    addDowngradeParser(subparsers)
    addDisableParser(subparsers)

    try {
      val namespace = parser.parseArgs(args)
      val command = namespace.getString("command")

      val commandConfig = namespace.get[File]("command_config")
      val props = if (commandConfig != null) {
        if (!commandConfig.exists()) {
          throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
        }
        Utils.loadProps(commandConfig.getPath)
      } else {
        new Properties()
      }
      props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
      val admin = Admin.create(props)
      try {
        command match {
          case "describe" => handleDescribe(out, admin)
          case "upgrade" => handleUpgrade(out, namespace, admin)
          case "downgrade" => handleDowngrade(out, namespace, admin)
          case "disable" => handleDisable(out, namespace, admin)
        }
      } finally {
        admin.close()
      }
      0
    } catch {
      case _: HelpScreenException =>
        0
      case e: ArgumentParserException =>
        System.err.println(s"Command line error: ${e.getMessage}. Type --help for help.")
        1
      case e: TerseFailure =>
        System.err.println(e.getMessage)
        1
    }
  }

  def addDescribeParser(subparsers: Subparsers): Unit = {
    subparsers.addParser("describe")
      .help("Describes the current active feature flags.")
  }

  def addUpgradeParser(subparsers: Subparsers): Unit = {
    val upgradeParser = subparsers.addParser("upgrade")
      .help("Upgrade one or more feature flags.")
    upgradeParser.addArgument("--metadata")
      .help("The level to which we should upgrade the metadata. For example, 3.3-IV3.")
      .action(store())
    upgradeParser.addArgument("--feature")
      .help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
      .action(append())
    upgradeParser.addArgument("--dry-run")
      .help("Validate this upgrade, but do not perform it.")
      .action(storeTrue())
  }

  def addDowngradeParser(subparsers: Subparsers): Unit = {
    val downgradeParser = subparsers.addParser("downgrade")
      .help("Downgrade one or more feature flags.")
    downgradeParser.addArgument("--metadata")
      .help("The level to which we should downgrade the metadata. For example, 3.3-IV0.")
      .action(store())
    downgradeParser.addArgument("--feature")
      .help("A feature downgrade we should perform, in feature=level format. For example: `metadata.version=5`.")
      .action(append())
    downgradeParser.addArgument("--unsafe")
      .help("Perform this downgrade even if it may irreversibly destroy metadata.")
      .action(storeTrue())
    downgradeParser.addArgument("--dry-run")
      .help("Validate this downgrade, but do not perform it.")
      .action(storeTrue())
  }

  def addDisableParser(subparsers: Subparsers): Unit = {
    val disableParser = subparsers.addParser("disable")
      .help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
    disableParser.addArgument("--feature")
      .help("A feature flag to disable.")
      .action(append())
    disableParser.addArgument("--unsafe")
      .help("Disable this feature flag even if it may irreversibly destroy metadata.")
      .action(storeTrue())
    disableParser.addArgument("--dry-run")
      .help("Perform a dry-run of this disable operation.")
      .action(storeTrue())
  }

  def levelToString(
    feature: String,
    level: Short
  ): String = {
    if (feature.equals(MetadataVersion.FEATURE_NAME)) {
      try {
        MetadataVersion.fromFeatureLevel(level).version()
      } catch {
        case e: Throwable => s"UNKNOWN [${level}]"
      }
    } else {
      level.toString
    }
  }

  def handleDescribe(
    out: PrintStream,
    admin: Admin
  ): Unit = {
    val featureMetadata = admin.describeFeatures().featureMetadata().get()
    val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet())
      featureList.forEach {
      case feature =>
        val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match {
          case None => 0.toShort
          case Some(v) => v.maxVersionLevel()
        }
        val range = featureMetadata.supportedFeatures().get(feature)
        out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n",
          feature,
          levelToString(feature, range.minVersion()),
          levelToString(feature, range.maxVersion()),
          levelToString(feature, finalizedLevel),
          featureMetadata.finalizedFeaturesEpoch().asScala.flatMap(e => Some(e.toString)).getOrElse("-"))
    }
  }

  def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
    MetadataVersion.VERSIONS.toList.asJava.
      subList(first.ordinal(), last.ordinal() + 1).
      asScala.mkString(", ")
  }

  def handleUpgrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
    handleUpgradeOrDowngrade("upgrade", out, namespace, admin, UpgradeType.UPGRADE)
  }

  def downgradeType(namespace: Namespace): UpgradeType = {
    val unsafe = namespace.getBoolean("unsafe")
    if (unsafe == null || !unsafe) {
      UpgradeType.SAFE_DOWNGRADE
    } else {
      UpgradeType.UNSAFE_DOWNGRADE
    }
  }

  def handleDowngrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
    handleUpgradeOrDowngrade("downgrade", out, namespace, admin, downgradeType(namespace))
  }

  def parseNameAndLevel(input: String): (String, Short) = {
    val equalsIndex = input.indexOf("=")
    if (equalsIndex < 0) {
      throw new TerseFailure(s"Can't parse feature=level string ${input}: equals sign not found.")
    }
    val name = input.substring(0, equalsIndex).trim
    val levelString = input.substring(equalsIndex + 1).trim
    val level = try {
      levelString.toShort
    } catch {
      case e: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " +
        s"unable to parse ${levelString} as a short.")
    }
    (name, level)
  }

  def handleUpgradeOrDowngrade(
    op: String,
    out: PrintStream,
    namespace: Namespace,
    admin: Admin,
    upgradeType: UpgradeType
  ): Unit = {
    val updates = new java.util.HashMap[String, FeatureUpdate]()
    Option(namespace.getString("metadata")).foreach(metadata => {
      val version = try {
        MetadataVersion.fromVersionString(metadata)
      } catch {
        case e: Throwable => throw new TerseFailure("Unsupported metadata version " + metadata +
          ". Supported metadata versions are " + metadataVersionsToString(
          MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()))
      }
      updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType))
    })
    Option(namespace.getList[String]("feature")).foreach(features => {
      features.forEach(feature => {
        val (name, level) = parseNameAndLevel(feature)
        if (updates.put(name, new FeatureUpdate(level, upgradeType)) != null) {
          throw new TerseFailure(s"Feature ${name} was specified more than once.")
        }
      })
    })
    update(op, out, admin, updates, namespace.getBoolean("dry-run"))
  }

  def handleDisable(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
    val upgradeType = downgradeType(namespace)
    val updates = new java.util.HashMap[String, FeatureUpdate]()
    Option(namespace.getList[String]("feature")).foreach(features => {
      features.forEach(name =>
        if (updates.put(name, new FeatureUpdate(0.toShort, upgradeType)) != null) {
          throw new TerseFailure(s"Feature ${name} was specified more than once.")
        })
      }
    )
    update("disable", out, admin, updates, namespace.getBoolean("dry-run"))
  }

  def update(
    op: String,
    out: PrintStream,
    admin: Admin,
    updates: java.util.HashMap[String, FeatureUpdate],
    dryRun: Boolean
  ): Unit = {
    if (updates.isEmpty) {
      throw new TerseFailure(s"You must specify at least one feature to ${op}")
    }
    val result =  admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun))
    val errors = result.values().asScala.map { case (feature, future) =>
      try {
        future.get()
        feature -> None
      } catch {
        case e: ExecutionException => feature -> Some(e.getCause)
        case t: Throwable => feature -> Some(t)
      }
    }
    var numFailures = 0
    errors.keySet.toList.sorted.foreach { feature =>
      val maybeThrowable = errors(feature)
      val level = updates.get(feature).maxVersionLevel()
      if (maybeThrowable.isDefined) {
        val helper = if (dryRun) {
          "Can not"
        } else {
          "Could not"
        }
        val suffix = if (op.equals("disable")) {
          s"disable ${feature}"
        } else {
          s"${op} ${feature} to ${level}"
        }
        out.println(s"${helper} ${suffix}. ${maybeThrowable.get.getMessage}")
        numFailures = numFailures + 1
      } else {
        val verb = if (dryRun) {
          "can be"
        } else {
          "was"
        }
        val obj = if (op.equals("disable")) {
          "disabled."
        } else {
          s"${op}d to ${level}."
        }
        out.println(s"${feature} ${verb} ${obj}")
      }
    }
    if (numFailures > 0) {
      throw new TerseFailure(s"${numFailures} out of ${updates.size} operation(s) failed.")
    }
  }
}

相关信息

kafka 源码目录

相关文章

kafka AclCommand 源码

kafka AdminOperationException 源码

kafka AdminUtils 源码

kafka BrokerApiVersionsCommand 源码

kafka BrokerMetadata 源码

kafka ConfigCommand 源码

kafka ConsumerGroupCommand 源码

kafka DelegationTokenCommand 源码

kafka DeleteRecordsCommand 源码

kafka LeaderElectionCommand 源码

0  赞