kafka ClientState 源码

  • 2022-10-20
  • 浏览 (331)

kafka ClientState 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingLong;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;

public class ClientState {
    private static final Logger LOG = LoggerFactory.getLogger(ClientState.class);
    public static final Comparator<TopicPartition> TOPIC_PARTITION_COMPARATOR = comparing(TopicPartition::topic).thenComparing(TopicPartition::partition);

    private final Map<String, String> clientTags;
    private final Map<TaskId, Long> taskOffsetSums; // contains only stateful tasks we previously owned
    private final Map<TaskId, Long> taskLagTotals;  // contains lag for all stateful tasks in the app topology
    private final Map<TopicPartition, String> ownedPartitions = new TreeMap<>(TOPIC_PARTITION_COMPARATOR);
    private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = new TreeMap<>();

    private final ClientStateTask assignedActiveTasks = new ClientStateTask(new TreeSet<>(), new TreeMap<>());
    private final ClientStateTask assignedStandbyTasks = new ClientStateTask(new TreeSet<>(), new TreeMap<>());
    private final ClientStateTask previousActiveTasks = new ClientStateTask(null, new TreeMap<>());
    private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null);
    private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>());

    private int capacity;

    public ClientState() {
        this(0);
    }

    public ClientState(final Map<String, String> clientTags) {
        this(0, clientTags);
    }

    ClientState(final int capacity) {
        this(capacity, Collections.emptyMap());
    }

    ClientState(final int capacity, final Map<String, String> clientTags) {
        previousStandbyTasks.taskIds(new TreeSet<>());
        previousActiveTasks.taskIds(new TreeSet<>());
        taskOffsetSums = new TreeMap<>();
        taskLagTotals = new TreeMap<>();
        this.capacity = capacity;
        this.clientTags = unmodifiableMap(clientTags);
    }

    // For testing only
    public ClientState(final Set<TaskId> previousActiveTasks,
                       final Set<TaskId> previousStandbyTasks,
                       final Map<TaskId, Long> taskLagTotals,
                       final Map<String, String> clientTags,
                       final int capacity) {
        this.previousStandbyTasks.taskIds(unmodifiableSet(new TreeSet<>(previousStandbyTasks)));
        this.previousActiveTasks.taskIds(unmodifiableSet(new TreeSet<>(previousActiveTasks)));
        taskOffsetSums = emptyMap();
        this.taskLagTotals = unmodifiableMap(taskLagTotals);
        this.capacity = capacity;
        this.clientTags = unmodifiableMap(clientTags);
    }

    int capacity() {
        return capacity;
    }

    public void incrementCapacity() {
        capacity++;
    }

    boolean reachedCapacity() {
        return assignedTaskCount() >= capacity;
    }

    public Set<TaskId> activeTasks() {
        return unmodifiableSet(assignedActiveTasks.taskIds());
    }

    public int activeTaskCount() {
        return assignedActiveTasks.taskIds().size();
    }

    double activeTaskLoad() {
        return ((double) activeTaskCount()) / capacity;
    }

    public void assignActiveTasks(final Collection<TaskId> tasks) {
        assignedActiveTasks.taskIds().addAll(tasks);
    }

    public void assignActiveToConsumer(final TaskId task, final String consumer) {
        if (!assignedActiveTasks.taskIds().contains(task)) {
            throw new IllegalStateException("added not assign active task " + task + " to this client state.");
        }
        assignedActiveTasks.consumerToTaskIds()
                           .computeIfAbsent(consumer, k -> new HashSet<>()).add(task);
    }

    public void assignStandbyToConsumer(final TaskId task, final String consumer) {
        assignedStandbyTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet<>()).add(task);
    }

    public void revokeActiveFromConsumer(final TaskId task, final String consumer) {
        revokingActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet<>()).add(task);
    }

    public Map<String, Set<TaskId>> prevOwnedActiveTasksByConsumer() {
        return previousActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> prevOwnedStandbyByConsumer() {
        // standbys are just those stateful tasks minus active tasks
        final Map<String, Set<TaskId>> consumerToPreviousStandbyTaskIds = new TreeMap<>();
        final Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds = previousActiveTasks.consumerToTaskIds();

        for (final Map.Entry<String, Set<TaskId>> entry: consumerToPreviousStatefulTaskIds.entrySet()) {
            final Set<TaskId> standbyTaskIds = new HashSet<>(entry.getValue());
            if (consumerToPreviousActiveTaskIds.containsKey(entry.getKey()))
                standbyTaskIds.removeAll(consumerToPreviousActiveTaskIds.get(entry.getKey()));
            consumerToPreviousStandbyTaskIds.put(entry.getKey(), standbyTaskIds);
        }

        return consumerToPreviousStandbyTaskIds;
    }

    // including both active and standby tasks
    public Set<TaskId> prevOwnedStatefulTasksByConsumer(final String memberId) {
        return consumerToPreviousStatefulTaskIds.get(memberId);
    }

    public Map<String, Set<TaskId>> assignedActiveTasksByConsumer() {
        return assignedActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> revokingActiveTasksByConsumer() {
        return revokingActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> assignedStandbyTasksByConsumer() {
        return assignedStandbyTasks.consumerToTaskIds();
    }

    public void assignActive(final TaskId task) {
        assertNotAssigned(task);
        assignedActiveTasks.taskIds().add(task);
    }

    public void unassignActive(final TaskId task) {
        final Set<TaskId> taskIds = assignedActiveTasks.taskIds();
        if (!taskIds.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign active task " + task + ", but it is not currently assigned: " + this);
        }
        taskIds.remove(task);
    }

    public Set<TaskId> standbyTasks() {
        return unmodifiableSet(assignedStandbyTasks.taskIds());
    }

    boolean hasStandbyTask(final TaskId taskId) {
        return assignedStandbyTasks.taskIds().contains(taskId);
    }

    int standbyTaskCount() {
        return assignedStandbyTasks.taskIds().size();
    }

    public void assignStandby(final TaskId task) {
        assertNotAssigned(task);
        assignedStandbyTasks.taskIds().add(task);
    }

    void unassignStandby(final TaskId task) {
        final Set<TaskId> taskIds = assignedStandbyTasks.taskIds();
        if (!taskIds.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign standby task " + task + ", but it is not currently assigned: " + this);
        }
        taskIds.remove(task);
    }

    Set<TaskId> assignedTasks() {
        final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks.taskIds();
        final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks.taskIds();
        // Since we're copying it, it's not strictly necessary to make it unmodifiable also.
        // I'm just trying to prevent subtle bugs if we write code that thinks it can update
        // the assignment by updating the returned set.
        return unmodifiableSet(
            union(
                () -> new HashSet<>(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()),
                assignedActiveTaskIds,
                assignedStandbyTaskIds
            )
        );
    }

    public int assignedTaskCount() {
        return activeTaskCount() + standbyTaskCount();
    }

    double assignedTaskLoad() {
        return ((double) assignedTaskCount()) / capacity;
    }

    boolean hasAssignedTask(final TaskId taskId) {
        return assignedActiveTasks.taskIds().contains(taskId) || assignedStandbyTasks.taskIds().contains(taskId);
    }

    Set<TaskId> prevActiveTasks() {
        return unmodifiableSet(previousActiveTasks.taskIds());
    }

    private void addPreviousActiveTask(final TaskId task) {
        previousActiveTasks.taskIds().add(task);
    }

    void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
        previousActiveTasks.taskIds().addAll(prevTasks);
    }

    Set<TaskId> prevStandbyTasks() {
        return unmodifiableSet(previousStandbyTasks.taskIds());
    }

    private void addPreviousStandbyTask(final TaskId task) {
        previousStandbyTasks.taskIds().add(task);
    }

    void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
        previousStandbyTasks.taskIds().addAll(standbyTasks);
    }

    Set<TaskId> previousAssignedTasks() {
        final Set<TaskId> previousActiveTaskIds = previousActiveTasks.taskIds();
        final Set<TaskId> previousStandbyTaskIds = previousStandbyTasks.taskIds();
        return union(() -> new HashSet<>(previousActiveTaskIds.size() + previousStandbyTaskIds.size()),
                     previousActiveTaskIds,
                     previousStandbyTaskIds);
    }

    // May return null
    public String previousOwnerForPartition(final TopicPartition partition) {
        return ownedPartitions.get(partition);
    }

    public Map<String, String> clientTags() {
        return clientTags;
    }

    public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions, final String consumer) {
        for (final TopicPartition tp : ownedPartitions) {
            this.ownedPartitions.put(tp, consumer);
        }
    }

    public void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
        this.taskOffsetSums.putAll(taskOffsetSums);
        consumerToPreviousStatefulTaskIds.put(consumerId, taskOffsetSums.keySet());
    }

    public void initializePrevTasks(final Map<TopicPartition, TaskId> taskForPartitionMap,
                                    final boolean hasNamedTopologies) {
        if (!previousActiveTasks.taskIds().isEmpty() || !previousStandbyTasks.taskIds().isEmpty()) {
            throw new IllegalStateException("Already added previous tasks to this client state.");
        }

        maybeFilterUnknownPrevTasksAndPartitions(taskForPartitionMap, hasNamedTopologies);
        initializePrevActiveTasksFromOwnedPartitions(taskForPartitionMap);
        initializeRemainingPrevTasksFromTaskOffsetSums();
    }

    private void maybeFilterUnknownPrevTasksAndPartitions(final Map<TopicPartition, TaskId> taskForPartitionMap,
                                                          final boolean hasNamedTopologies) {
        // If this application uses named topologies, then it's possible for members to report tasks
        // or partitions in their subscription that belong to a named topology that the group leader
        // doesn't currently recognize, eg because it was just removed
        if (hasNamedTopologies) {
            ownedPartitions.keySet().retainAll(taskForPartitionMap.keySet());
            previousActiveTasks.taskIds().retainAll(taskForPartitionMap.values());
            previousStandbyTasks.taskIds().retainAll(taskForPartitionMap.values());
        }
    }

    /**
     * Compute the lag for each stateful task, including tasks this client did not previously have.
     */
    public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndOffsetSums) {
        if (!taskLagTotals.isEmpty()) {
            throw new IllegalStateException("Already computed task lags for this client.");
        }

        for (final Map.Entry<TaskId, Long> taskEntry : allTaskEndOffsetSums.entrySet()) {
            final TaskId task = taskEntry.getKey();
            final Long endOffsetSum = taskEntry.getValue();
            final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);

            if (offsetSum == Task.LATEST_OFFSET) {
                taskLagTotals.put(task, Task.LATEST_OFFSET);
            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
            } else if (endOffsetSum < offsetSum) {
                LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" +
                             offsetSum + " on member " + uuid + ". This probably means the task is corrupted," +
                             " which in turn indicates that it will need to restore from scratch if it gets assigned." +
                             " The assignor will de-prioritize returning this task to this member in the hopes that" +
                             " some other member may be able to re-use its state.");
                taskLagTotals.put(task, endOffsetSum);
            } else {
                taskLagTotals.put(task, endOffsetSum - offsetSum);
            }
        }
    }

    /**
     * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
     * did not have any state for this task on disk.
     *
     * @return end offset sum - offset sum
     *          Task.LATEST_OFFSET if this was previously an active running task on this client
     */
    public long lagFor(final TaskId task) {
        final Long totalLag = taskLagTotals.get(task);
        if (totalLag == null) {
            throw new IllegalStateException("Tried to lookup lag for unknown task " + task);
        }
        return totalLag;
    }

    /**
     * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
     */
    public SortedSet<TaskId> prevTasksByLag(final String consumer) {
        final SortedSet<TaskId> prevTasksByLag = new TreeSet<>(comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
        for (final TaskId task : prevOwnedStatefulTasksByConsumer(consumer)) {
            if (taskLagTotals.containsKey(task)) {
                prevTasksByLag.add(task);
            } else {
                LOG.debug("Skipping previous task {} since it's not part of the current assignment", task);
            }
        }
        return prevTasksByLag;
    }

    public Set<TaskId> statefulActiveTasks() {
        return assignedActiveTasks.taskIds().stream().filter(this::isStateful).collect(Collectors.toSet());
    }

    public Set<TaskId> statelessActiveTasks() {
        return assignedActiveTasks.taskIds().stream().filter(task -> !isStateful(task)).collect(Collectors.toSet());
    }

    boolean hasUnfulfilledQuota(final int tasksPerThread) {
        return assignedActiveTasks.taskIds().size() < capacity * tasksPerThread;
    }

    boolean hasMoreAvailableCapacityThan(final ClientState other) {
        if (capacity <= 0) {
            throw new IllegalStateException("Capacity of this ClientState must be greater than 0.");
        }

        if (other.capacity <= 0) {
            throw new IllegalStateException("Capacity of other ClientState must be greater than 0");
        }

        final double otherLoad = (double) other.assignedTaskCount() / other.capacity;
        final double thisLoad = (double) assignedTaskCount() / capacity;

        if (thisLoad < otherLoad) {
            return true;
        } else if (thisLoad > otherLoad) {
            return false;
        } else {
            return capacity > other.capacity;
        }
    }

    public String consumers() {
        return consumerToPreviousStatefulTaskIds.keySet().toString();
    }

    public String currentAssignment() {
        return "[activeTasks: (" + assignedActiveTasks.taskIds() +
               ") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";
    }

    @Override
    public String toString() {
        return "[activeTasks: (" + assignedActiveTasks.taskIds() +
               ") standbyTasks: (" + assignedStandbyTasks.taskIds() +
               ") prevActiveTasks: (" + previousActiveTasks.taskIds() +
               ") prevStandbyTasks: (" + previousStandbyTasks.taskIds() +
               ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
               ") taskLagTotals: (" + taskLagTotals.entrySet() +
               ") clientTags: (" + clientTags.entrySet() +
               ") capacity: " + capacity +
               " assigned: " + assignedTaskCount() +
               "]";
    }

    private boolean isStateful(final TaskId task) {
        return taskLagTotals.containsKey(task);
    }

    private void initializePrevActiveTasksFromOwnedPartitions(final Map<TopicPartition, TaskId> taskForPartitionMap) {
        // there are three cases where we need to construct some or all of the prevTasks from the ownedPartitions:
        // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks at all and rely on ownedPartitions
        // 2) future client during version probing, when we can't decode the future subscription info's prev tasks
        // 3) stateless tasks are not encoded in the task lags, and must be figured out from the ownedPartitions
        for (final Map.Entry<TopicPartition, String> partitionEntry : ownedPartitions.entrySet()) {
            final TopicPartition tp = partitionEntry.getKey();
            final TaskId task = taskForPartitionMap.get(tp);
            if (task != null) {
                addPreviousActiveTask(task);
                previousActiveTasks.consumerToTaskIds().computeIfAbsent(partitionEntry.getValue(), k -> new HashSet<>()).add(task);
            } else {
                LOG.error("No task found for topic partition {}", tp);
            }
        }
    }

    private void initializeRemainingPrevTasksFromTaskOffsetSums() {
        final Set<TaskId> previousActiveTaskIds = previousActiveTasks.taskIds();
        if (previousActiveTaskIds.isEmpty() && !ownedPartitions.isEmpty()) {
            LOG.error("Tried to process tasks in offset sum map before processing tasks from ownedPartitions = {}", ownedPartitions);
            throw new IllegalStateException("Must initialize prevActiveTasks from ownedPartitions before initializing remaining tasks.");
        }
        for (final Map.Entry<TaskId, Long> taskEntry : taskOffsetSums.entrySet()) {
            final TaskId task = taskEntry.getKey();
            if (!previousActiveTaskIds.contains(task)) {
                final long offsetSum = taskEntry.getValue();
                if (offsetSum == Task.LATEST_OFFSET) {
                    addPreviousActiveTask(task);
                } else {
                    addPreviousStandbyTask(task);
                }
            }
        }
    }

    private void assertNotAssigned(final TaskId task) {
        if (assignedStandbyTasks.taskIds().contains(task) || assignedActiveTasks.taskIds().contains(task)) {
            throw new IllegalArgumentException("Tried to assign task " + task + ", but it is already assigned: " + this);
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka AssignmentInfo 源码

kafka AssignorConfiguration 源码

kafka AssignorError 源码

kafka ClientStateTask 源码

kafka ClientTagAwareStandbyTaskAssignor 源码

kafka ConstrainedPrioritySet 源码

kafka ConsumerProtocolUtils 源码

kafka CopartitionedTopicsEnforcer 源码

kafka DefaultStandbyTaskAssignor 源码

kafka FallbackPriorTaskAssignor 源码

0  赞