kafka ControllerApis 源码

  • 2022-10-20
  • 浏览 (211)

kafka ControllerApis 代码

文件路径:/core/src/main/scala/kafka/server/ControllerApis.scala

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.server

import java.util
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
import java.util.concurrent.CompletableFuture
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion

import scala.jdk.CollectionConverters._


/**
 * Request handler for Controller APIs
 */
class ControllerApis(val requestChannel: RequestChannel,
                     val authorizer: Option[Authorizer],
                     val quotas: QuotaManagers,
                     val time: Time,
                     val controller: Controller,
                     val raftManager: RaftManager[ApiMessageAndVersion],
                     val config: KafkaConfig,
                     val metaProperties: MetaProperties,
                     val controllerNodes: Seq[Node],
                     val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {

  this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
  val authHelper = new AuthHelper(authorizer)
  val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
  private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)

  def isClosed: Boolean = aclApis.isClosed

  def close(): Unit = aclApis.close()

  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
    try {
      val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match {
        case ApiKeys.FETCH => handleFetch(request)
        case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.ALTER_CONFIGS => handleLegacyAlterConfigs(request)
        case ApiKeys.VOTE => handleVote(request)
        case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
        case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
        case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
        case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
        case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
        case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
        case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
        case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
        case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
        case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
      }

      // This catches exceptions in the future and subsequent completion stages returned by the request handlers.
      handlerFuture.whenComplete { (_, exception) =>
        if (exception != null) {
          // CompletionException does not include the stack frames in its "cause" exception, so we need to
          // log the original exception here
          error(s"Unexpected error handling request ${request.requestDesc(true)} " +
            s"with context ${request.context}", exception)
          requestHelper.handleError(request, exception)
        }
      }
    } catch {
      case e: FatalExitError => throw e
      case t: Throwable => {
        // This catches exceptions in the blocking parts of the request handlers
        error(s"Unexpected error handling request ${request.requestDesc(true)} " +
          s"with context ${request.context}", t)
        requestHelper.handleError(request, t)
      }
    } finally {
      // Only record local completion time if it is unset.
      if (request.apiLocalCompleteTimeNanos < 0) {
        request.apiLocalCompleteTimeNanos = time.nanoseconds
      }
    }
  }

  def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): CompletableFuture[Unit] = {
    if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
      requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
        s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
    } else {
      EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
    }
    CompletableFuture.completedFuture[Unit](())
  }

  def handleSaslHandshakeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val responseData = new SaslHandshakeResponseData().setErrorCode(ILLEGAL_SASL_STATE.code)
    requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
    CompletableFuture.completedFuture[Unit](())
  }

  def handleSaslAuthenticateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val responseData = new SaslAuthenticateResponseData()
      .setErrorCode(ILLEGAL_SASL_STATE.code)
      .setErrorMessage("SaslAuthenticate request received after successful authentication")
    requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
    CompletableFuture.completedFuture[Unit](())
  }

  def handleFetch(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
  }

  def handleFetchSnapshot(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
  }

  def handleDeleteTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
    val future = deleteTopics(context,
      deleteTopicsRequest.data,
      request.context.apiVersion,
      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
    future.handle[Unit] { (results, exception) =>
      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
        if (exception != null) {
          deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
        } else {
          val responseData = new DeleteTopicsResponseData().
            setResponses(new DeletableTopicResultCollection(results.iterator)).
            setThrottleTimeMs(throttleTimeMs)
          new DeleteTopicsResponse(responseData)
        }
      })
    }
  }

  def deleteTopics(
    context: ControllerRequestContext,
    request: DeleteTopicsRequestData,
    apiVersion: Int,
    hasClusterAuth: Boolean,
    getDescribableTopics: Iterable[String] => Set[String],
    getDeletableTopics: Iterable[String] => Set[String]
  ): CompletableFuture[util.List[DeletableTopicResult]] = {
    // Check if topic deletion is enabled at all.
    if (!config.deleteTopicEnable) {
      if (apiVersion < 3) {
        throw new InvalidRequestException("Topic deletion is disabled.")
      } else {
        throw new TopicDeletionDisabledException()
      }
    }
    // The first step is to load up the names and IDs that have been provided by the
    // request.  This is a bit messy because we support multiple ways of referring to
    // topics (both by name and by id) and because we need to check for duplicates or
    // other invalid inputs.
    val responses = new util.ArrayList[DeletableTopicResult]
    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
      responses.add(new DeletableTopicResult().
        setName(name).
        setTopicId(id).
        setErrorCode(error.error.code).
        setErrorMessage(error.message))
    }
    val providedNames = new util.HashSet[String]
    val duplicateProvidedNames = new util.HashSet[String]
    val providedIds = new util.HashSet[Uuid]
    val duplicateProvidedIds = new util.HashSet[Uuid]
    def addProvidedName(name: String): Unit = {
      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
        duplicateProvidedNames.add(name)
        providedNames.remove(name)
      }
    }
    request.topicNames.forEach(addProvidedName)
    request.topics.forEach {
      topic => if (topic.name == null) {
        if (topic.topicId.equals(ZERO_UUID)) {
          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
            "Neither topic name nor id were specified."))
        } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) {
          duplicateProvidedIds.add(topic.topicId)
          providedIds.remove(topic.topicId)
        }
      } else {
        if (topic.topicId.equals(ZERO_UUID)) {
          addProvidedName(topic.name)
        } else {
          appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
            "You may not specify both topic name and topic id."))
        }
      }
    }
    // Create error responses for duplicates.
    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
    duplicateProvidedIds.forEach(id => appendResponse(null, id,
      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
    // At this point we have all the valid names and IDs that have been provided.
    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
    // we need to resolve all IDs to names.
    val toAuthenticate = new util.HashSet[String]
    toAuthenticate.addAll(providedNames)
    val idToName = new util.HashMap[Uuid, String]
    controller.findTopicNames(context, providedIds).thenCompose { topicNames =>
      topicNames.forEach { (id, nameOrError) =>
        if (nameOrError.isError) {
          appendResponse(null, id, nameOrError.error())
        } else {
          toAuthenticate.add(nameOrError.result())
          idToName.put(id, nameOrError.result())
        }
      }
      // Get the list of deletable topics (those we can delete) and the list of describeable
      // topics.
      val topicsToAuthenticate = toAuthenticate.asScala
      val (describeable, deletable) = if (hasClusterAuth) {
        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
      } else {
        (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
      }
      // For each topic that was provided by ID, check if authentication failed.
      // If so, remove it from the idToName map and create an error response for it.
      val iterator = idToName.entrySet().iterator()
      while (iterator.hasNext) {
        val entry = iterator.next()
        val id = entry.getKey
        val name = entry.getValue
        if (!deletable.contains(name)) {
          if (describeable.contains(name)) {
            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
          } else {
            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
          }
          iterator.remove()
        }
      }
      // For each topic that was provided by name, check if authentication failed.
      // If so, create an error response for it. Otherwise, add it to the idToName map.
      controller.findTopicIds(context, providedNames).thenCompose { topicIds =>
        topicIds.forEach { (name, idOrError) =>
          if (!describeable.contains(name)) {
            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
          } else if (idOrError.isError) {
            appendResponse(name, ZERO_UUID, idOrError.error)
          } else if (deletable.contains(name)) {
            val id = idOrError.result()
            if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
              // This is kind of a weird case: what if we supply topic ID X and also a name
              // that maps to ID X?  In that case, _if authorization succeeds_, we end up
              // here.  If authorization doesn't succeed, we refrain from commenting on the
              // situation since it would reveal topic ID mappings.
              duplicateProvidedIds.add(id)
              idToName.remove(id)
              appendResponse(name, id, new ApiError(INVALID_REQUEST,
                "The provided topic name maps to an ID that was already supplied."))
            }
          } else {
            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
          }
        }
        // Finally, the idToName map contains all the topics that we are authorized to delete.
        // Perform the deletion and create responses for each one.
        controller.deleteTopics(context, idToName.keySet).thenApply { idToError =>
          idToError.forEach { (id, error) =>
            appendResponse(idToName.get(id), id, error)
          }
          // Shuffle the responses so that users can not use patterns in their positions to
          // distinguish between absent topics and topics we are not permitted to see.
          Collections.shuffle(responses)
          responses
        }
      }
    }
  }

  def handleCreateTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val createTopicsRequest = request.body[CreateTopicsRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
    val future = createTopics(context,
        createTopicsRequest.data,
        authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
        names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
        names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
            names, logIfDenied = false)(identity))
    future.handle[Unit] { (result, exception) =>
      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
        if (exception != null) {
          createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
        } else {
          result.setThrottleTimeMs(throttleTimeMs)
          new CreateTopicsResponse(result)
        }
      })
    }
  }

  def createTopics(
    context: ControllerRequestContext,
    request: CreateTopicsRequestData,
    hasClusterAuth: Boolean,
    getCreatableTopics: Iterable[String] => Set[String],
    getDescribableTopics: Iterable[String] => Set[String]
  ): CompletableFuture[CreateTopicsResponseData] = {
    val topicNames = new util.HashSet[String]()
    val duplicateTopicNames = new util.HashSet[String]()
    request.topics().forEach { topicData =>
      if (!duplicateTopicNames.contains(topicData.name())) {
        if (!topicNames.add(topicData.name())) {
          topicNames.remove(topicData.name())
          duplicateTopicNames.add(topicData.name())
        }
      }
    }
    val authorizedTopicNames = if (hasClusterAuth) {
      topicNames.asScala
    } else {
      getCreatableTopics.apply(topicNames.asScala)
    }
    val describableTopicNames = getDescribableTopics.apply(topicNames.asScala).asJava
    val effectiveRequest = request.duplicate()
    val iterator = effectiveRequest.topics().iterator()
    while (iterator.hasNext) {
      val creatableTopic = iterator.next()
      if (duplicateTopicNames.contains(creatableTopic.name()) ||
          !authorizedTopicNames.contains(creatableTopic.name())) {
        iterator.remove()
      }
    }
    controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response =>
      duplicateTopicNames.forEach { name =>
        response.topics().add(new CreatableTopicResult().
          setName(name).
          setErrorCode(INVALID_REQUEST.code).
          setErrorMessage("Duplicate topic name."))
      }
      topicNames.forEach { name =>
        if (!authorizedTopicNames.contains(name)) {
          response.topics().add(new CreatableTopicResult().
            setName(name).
            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
        }
      }
      response
    }
  }

  def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    // Note that broker returns its full list of supported ApiKeys and versions regardless of current
    // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
    // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
    // If this is considered to leak information about the broker version a workaround is to use SSL
    // with client authentication which is performed at an earlier stage of the connection where the
    // ApiVersionRequest is not available.
    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
      val apiVersionRequest = request.body[ApiVersionsRequest]
      if (apiVersionRequest.hasUnsupportedRequestVersion) {
        apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception)
      } else if (!apiVersionRequest.isValid) {
        apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)
      } else {
        apiVersionManager.apiVersionResponse(requestThrottleMs)
      }
    }
    requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
    CompletableFuture.completedFuture[Unit](())
  }

  def authorizeAlterResource(requestContext: RequestContext,
                             resource: ConfigResource): ApiError = {
    resource.`type` match {
      case ConfigResource.Type.BROKER =>
        if (authHelper.authorize(requestContext, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
          new ApiError(NONE)
        } else {
          new ApiError(CLUSTER_AUTHORIZATION_FAILED)
        }
      case ConfigResource.Type.TOPIC =>
        if (authHelper.authorize(requestContext, ALTER_CONFIGS, TOPIC, resource.name)) {
          new ApiError(NONE)
        } else {
          new ApiError(TOPIC_AUTHORIZATION_FAILED)
        }
      case rt => new ApiError(INVALID_REQUEST, s"Unexpected resource type $rt.")
    }
  }

  def handleLegacyAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val response = new AlterConfigsResponseData()
    val alterConfigsRequest = request.body[AlterConfigsRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
    val duplicateResources = new util.HashSet[ConfigResource]
    val configChanges = new util.HashMap[ConfigResource, util.Map[String, String]]()
    alterConfigsRequest.data.resources.forEach { resource =>
      val configResource = new ConfigResource(
        ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
      if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
        response.responses().add(new OldAlterConfigsResourceResponse().
          setErrorCode(UNSUPPORTED_VERSION.code()).
          setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
          setResourceName(resource.resourceName()).
          setResourceType(resource.resourceType()))
      } else if (!duplicateResources.contains(configResource)) {
        val configs = new util.HashMap[String, String]()
        resource.configs().forEach(config => configs.put(config.name(), config.value()))
        if (configChanges.put(configResource, configs) != null) {
          duplicateResources.add(configResource)
          configChanges.remove(configResource)
          response.responses().add(new OldAlterConfigsResourceResponse().
            setErrorCode(INVALID_REQUEST.code()).
            setErrorMessage("Duplicate resource.").
            setResourceName(resource.resourceName()).
            setResourceType(resource.resourceType()))
        }
      }
    }
    val iterator = configChanges.keySet().iterator()
    while (iterator.hasNext) {
      val resource = iterator.next()
      val apiError = authorizeAlterResource(request.context, resource)
      if (apiError.isFailure) {
        response.responses().add(new OldAlterConfigsResourceResponse().
          setErrorCode(apiError.error().code()).
          setErrorMessage(apiError.message()).
          setResourceName(resource.name()).
          setResourceType(resource.`type`().id()))
        iterator.remove()
      }
    }
    controller.legacyAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
      .handle[Unit] { (controllerResults, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          controllerResults.entrySet().forEach(entry => response.responses().add(
            new OldAlterConfigsResourceResponse().
              setErrorCode(entry.getValue.error().code()).
              setErrorMessage(entry.getValue.message()).
              setResourceName(entry.getKey.name()).
              setResourceType(entry.getKey.`type`().id())))
          requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
            new AlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
        }
      }
  }

  def handleVote(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
  }

  def handleBeginQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
  }

  def handleEndQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
  }

  def handleDescribeQuorum(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, DESCRIBE)
    handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
  }

  def handleElectLeaders(request: RequestChannel.Request): CompletableFuture[Unit] = {
    authHelper.authorizeClusterOperation(request, ALTER)
    val electLeadersRequest = request.body[ElectLeadersRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data.timeoutMs))
    val future = controller.electLeaders(context, electLeadersRequest.data)
    future.handle[Unit] { (responseData, exception) =>
      if (exception != null) {
        requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
          electLeadersRequest.getErrorResponse(throttleMs, exception)
        })
      } else {
        requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
          new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs))
        })
      }
    }
  }

  def handleAlterPartitionRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val alterPartitionRequest = request.body[AlterPartitionRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    val future = controller.alterPartition(context, alterPartitionRequest.data)
    future.handle[Unit] { (result, exception) =>
      val response = if (exception != null) {
        alterPartitionRequest.getErrorResponse(exception)
      } else {
        new AlterPartitionResponse(result)
      }
      requestHelper.sendResponseExemptThrottle(request, response)
    }
  }

  def handleBrokerHeartBeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val heartbeatRequest = request.body[BrokerHeartbeatRequest]
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
    controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) =>
      def createResponseCallback(requestThrottleMs: Int,
                                 reply: BrokerHeartbeatReply,
                                 e: Throwable): BrokerHeartbeatResponse = {
        if (e != null) {
          new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
            setThrottleTimeMs(requestThrottleMs).
            setErrorCode(Errors.forException(e).code))
        } else {
          new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
            setThrottleTimeMs(requestThrottleMs).
            setErrorCode(NONE.code).
            setIsCaughtUp(reply.isCaughtUp).
            setIsFenced(reply.isFenced).
            setShouldShutDown(reply.shouldShutDown))
        }
      }
      requestHelper.sendResponseMaybeThrottle(request,
        requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
    }
  }

  def handleUnregisterBroker(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val decommissionRequest = request.body[UnregisterBrokerRequest]
    authHelper.authorizeClusterOperation(request, ALTER)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())

    controller.unregisterBroker(context, decommissionRequest.data.brokerId).handle[Unit] { (_, e) =>
      def createResponseCallback(requestThrottleMs: Int,
                                 e: Throwable): UnregisterBrokerResponse = {
        if (e != null) {
          new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
            setThrottleTimeMs(requestThrottleMs).
            setErrorCode(Errors.forException(e).code))
        } else {
          new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
            setThrottleTimeMs(requestThrottleMs))
        }
      }
      requestHelper.sendResponseMaybeThrottle(request,
        requestThrottleMs => createResponseCallback(requestThrottleMs, e))
    }
  }

  def handleBrokerRegistration(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val registrationRequest = request.body[BrokerRegistrationRequest]
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())

    controller.registerBroker(context, registrationRequest.data).handle[Unit] { (reply, e) =>
      def createResponseCallback(requestThrottleMs: Int,
                                 reply: BrokerRegistrationReply,
                                 e: Throwable): BrokerRegistrationResponse = {
        if (e != null) {
          new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
            setThrottleTimeMs(requestThrottleMs).
            setErrorCode(Errors.forException(e).code))
        } else {
          new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
            setThrottleTimeMs(requestThrottleMs).
            setErrorCode(NONE.code).
            setBrokerEpoch(reply.epoch))
        }
      }
      requestHelper.sendResponseMaybeThrottle(request,
        requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
    }
  }

  private def handleRaftRequest(request: RequestChannel.Request,
                                buildResponse: ApiMessage => AbstractResponse): CompletableFuture[Unit] = {
    val requestBody = request.body[AbstractRequest]
    val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
    future.handle[Unit] { (responseData, exception) =>
      val response = if (exception != null) {
        requestBody.getErrorResponse(exception)
      } else {
        buildResponse(responseData)
      }
      requestHelper.sendResponseExemptThrottle(request, response)
    }
  }

  def handleAlterClientQuotas(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val quotaRequest = request.body[AlterClientQuotasRequest]
    authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())
    controller.alterClientQuotas(context, quotaRequest.entries, quotaRequest.validateOnly)
      .handle[Unit] { (results, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
            AlterClientQuotasResponse.fromQuotaEntities(results, requestThrottleMs))
        }
      }
  }

  def handleIncrementalAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val response = new IncrementalAlterConfigsResponseData()
    val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())
    val duplicateResources = new util.HashSet[ConfigResource]
    val configChanges = new util.HashMap[ConfigResource,
      util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
    alterConfigsRequest.data.resources.forEach { resource =>
      val configResource = new ConfigResource(
        ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
      if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
        response.responses().add(new AlterConfigsResourceResponse().
          setErrorCode(UNSUPPORTED_VERSION.code()).
          setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
          setResourceName(resource.resourceName()).
          setResourceType(resource.resourceType()))
      } else if (!duplicateResources.contains(configResource)) {
        val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
        resource.configs.forEach { config =>
          altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
            AlterConfigOp.OpType.forId(config.configOperation), config.value))
        }
        if (configChanges.put(configResource, altersByName) != null) {
          duplicateResources.add(configResource)
          configChanges.remove(configResource)
          response.responses().add(new AlterConfigsResourceResponse().
            setErrorCode(INVALID_REQUEST.code()).
            setErrorMessage("Duplicate resource.").
            setResourceName(resource.resourceName()).
            setResourceType(resource.resourceType()))
        }
      }
    }
    val iterator = configChanges.keySet().iterator()
    while (iterator.hasNext) {
      val resource = iterator.next()
      val apiError = authorizeAlterResource(request.context, resource)
      if (apiError.isFailure) {
        response.responses().add(new AlterConfigsResourceResponse().
          setErrorCode(apiError.error().code()).
          setErrorMessage(apiError.message()).
          setResourceName(resource.name()).
          setResourceType(resource.`type`().id()))
        iterator.remove()
      }
    }
    controller.incrementalAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
      .handle[Unit] { (controllerResults, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          controllerResults.entrySet().forEach(entry => response.responses().add(
            new AlterConfigsResourceResponse().
              setErrorCode(entry.getValue.error().code()).
              setErrorMessage(entry.getValue.message()).
              setResourceName(entry.getKey.name()).
              setResourceType(entry.getKey.`type`().id())))
          requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
            new IncrementalAlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
        }
      }
  }

  def handleCreatePartitions(request: RequestChannel.Request): CompletableFuture[Unit] = {
    def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
      authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
    }
    val createPartitionsRequest = request.body[CreatePartitionsRequest]
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data.timeoutMs))
    val future = createPartitions(context,
      createPartitionsRequest.data(),
      filterAlterAuthorizedTopics)
    future.handle[Unit] { (responses, exception) =>
      if (exception != null) {
        requestHelper.handleError(request, exception)
      } else {
        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
          val responseData = new CreatePartitionsResponseData().
            setResults(responses).
            setThrottleTimeMs(requestThrottleMs)
          new CreatePartitionsResponse(responseData)
        })
      }
    }
  }

  def createPartitions(
    context: ControllerRequestContext,
    request: CreatePartitionsRequestData,
    getAlterAuthorizedTopics: Iterable[String] => Set[String]
  ): CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
    val responses = new util.ArrayList[CreatePartitionsTopicResult]()
    val duplicateTopicNames = new util.HashSet[String]()
    val topicNames = new util.HashSet[String]()
    request.topics().forEach {
      topic =>
        if (!topicNames.add(topic.name())) {
          duplicateTopicNames.add(topic.name())
        }
    }
    duplicateTopicNames.forEach { topicName =>
      responses.add(new CreatePartitionsTopicResult().
        setName(topicName).
        setErrorCode(INVALID_REQUEST.code).
        setErrorMessage("Duplicate topic name."))
        topicNames.remove(topicName)
    }
    val authorizedTopicNames = getAlterAuthorizedTopics(topicNames.asScala)
    val topics = new util.ArrayList[CreatePartitionsTopic]
    topicNames.forEach { topicName =>
      if (authorizedTopicNames.contains(topicName)) {
        topics.add(request.topics().find(topicName))
      } else {
        responses.add(new CreatePartitionsTopicResult().
          setName(topicName).
          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
      }
    }
    controller.createPartitions(context, topics, request.validateOnly).thenApply { results =>
      results.forEach(response => responses.add(response))
      responses
    }
  }

  def handleAlterPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val alterRequest = request.body[AlterPartitionReassignmentsRequest]
    authHelper.authorizeClusterOperation(request, ALTER)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      requestTimeoutMsToDeadlineNs(time, alterRequest.data.timeoutMs))
    controller.alterPartitionReassignments(context, alterRequest.data)
      .thenApply[Unit] { response =>
        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
          new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
      }
  }

  def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val listRequest = request.body[ListPartitionReassignmentsRequest]
    authHelper.authorizeClusterOperation(request, DESCRIBE)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())
    controller.listPartitionReassignments(context, listRequest.data)
      .thenApply[Unit] { response =>
        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
          new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
      }
  }

  def handleAllocateProducerIdsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
        OptionalLong.empty())
    controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
      .handle[Unit] { (results, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
            results.setThrottleTimeMs(requestThrottleMs)
            new AllocateProducerIdsResponse(results)
          })
        }
      }
  }

  def handleUpdateFeatures(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
    authHelper.authorizeClusterOperation(request, ALTER)
    val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
      OptionalLong.empty())
    controller.updateFeatures(context, updateFeaturesRequest.data)
      .handle[Unit] { (response, exception) =>
        if (exception != null) {
          requestHelper.handleError(request, exception)
        } else {
          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
            new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)))
        }
      }
  }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractFetcherManager 源码

kafka AbstractFetcherThread 源码

kafka AclApis 源码

kafka ActionQueue 源码

kafka AlterPartitionManager 源码

kafka ApiVersionManager 源码

kafka AuthHelper 源码

kafka AutoTopicCreationManager 源码

kafka BrokerBlockingSender 源码

kafka BrokerFeatures 源码

0  赞