kafka IncrementalCooperativeAssignor 源码

  • 2022-10-20
  • 浏览 (218)

kafka IncrementalCooperativeAssignor 代码

文件路径:/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
import static org.apache.kafka.connect.util.ConnectUtils.transformValues;

/**
 * An assignor that computes a distribution of connectors and tasks according to the incremental
 * cooperative strategy for rebalancing. Note that this class is NOT thread-safe.
 * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect">
 * KIP-415 for a description of the assignment policy. </a>
 *
 */
public class IncrementalCooperativeAssignor implements ConnectAssignor {
    private final Logger log;
    private final Time time;
    private final int maxDelay;
    private ConnectorsAndTasks previousAssignment;
    private final ConnectorsAndTasks previousRevocation;
    private boolean canRevoke;
    // visible for testing
    protected final Set<String> candidateWorkersForReassignment;
    protected long scheduledRebalance;
    protected int delay;
    protected int previousGenerationId;
    protected Set<String> previousMembers;

    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
        this.log = logContext.logger(IncrementalCooperativeAssignor.class);
        this.time = time;
        this.maxDelay = maxDelay;
        this.previousAssignment = ConnectorsAndTasks.EMPTY;
        this.previousRevocation = new ConnectorsAndTasks.Builder().build();
        this.canRevoke = true;
        this.scheduledRebalance = 0;
        this.candidateWorkersForReassignment = new LinkedHashSet<>();
        this.delay = 0;
        this.previousGenerationId = -1;
        this.previousMembers = Collections.emptySet();
    }

    @Override
    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
                                                     List<JoinGroupResponseMember> allMemberMetadata,
                                                     WorkerCoordinator coordinator) {
        log.debug("Performing task assignment");

        Map<String, ExtendedWorkerState> memberConfigs = new HashMap<>();
        for (JoinGroupResponseMember member : allMemberMetadata) {
            memberConfigs.put(
                    member.memberId(),
                    IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
        }
        log.debug("Member configs: {}", memberConfigs);

        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
        // even if some members have fallen behind. The config offset used to generate the assignment is included in
        // the response so members that have fallen behind will not use the assignment until they have caught up.
        long maxOffset = memberConfigs.values().stream().map(ExtendedWorkerState::offset).max(Long::compare).get();
        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
                  maxOffset, coordinator.configSnapshot().offset());

        short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

        Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
        if (leaderOffset == null) {
            Map<String, ExtendedAssignment> assignments = fillAssignments(
                    memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
                    leaderId, memberConfigs.get(leaderId).url(), maxOffset,
                    ClusterAssignment.EMPTY, 0, protocolVersion);
            return serializeAssignments(assignments, protocolVersion);
        }
        return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
    }

    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
        // If this leader is behind some other members, we can't do assignment
        if (coordinator.configSnapshot().offset() < maxOffset) {
            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
            // is also safe to use this newer state.
            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                log.info("Was selected to perform assignments, but do not have latest config found in sync request. "
                         + "Returning an empty configuration to trigger re-sync.");
                return null;
            } else {
                coordinator.configSnapshot(updatedSnapshot);
                return updatedSnapshot.offset();
            }
        }
        return maxOffset;
    }

    /**
     * Performs task assignment based on the incremental cooperative connect protocol.
     * Read more on the design and implementation in:
     * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect">
     * KIP-415</a>
     *
     * @param leaderId the ID of the group leader
     * @param maxOffset the latest known offset of the configuration topic
     * @param memberConfigs the metadata of all the members of the group as gather in the current
     * round of rebalancing
     * @param coordinator the worker coordinator instance that provide the configuration snapshot
     * and get assigned the leader state during this assignment
     * @param protocolVersion the Connect subprotocol version
     * @return the serialized assignment of tasks to the whole group, including assigned or
     * revoked tasks
     */
    protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
                                                            Map<String, ExtendedWorkerState> memberConfigs,
                                                            WorkerCoordinator coordinator, short protocolVersion) {
        log.debug("Performing task assignment during generation: {} with memberId: {}",
                coordinator.generationId(), coordinator.memberId());
        Map<String, ConnectorsAndTasks> memberAssignments = transformValues(
                memberConfigs,
                memberConfig -> new ConnectorsAndTasks.Builder()
                        .with(memberConfig.assignment().connectors(), memberConfig.assignment().tasks())
                        .build()
        );
        ClusterAssignment clusterAssignment = performTaskAssignment(
                coordinator.configSnapshot(),
                coordinator.lastCompletedGenerationId(),
                coordinator.generationId(),
                memberAssignments
        );

        coordinator.leaderState(new LeaderState(memberConfigs, clusterAssignment.allAssignedConnectors(), clusterAssignment.allAssignedTasks()));

        Map<String, ExtendedAssignment> assignments =
                fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
                        memberConfigs.get(leaderId).url(), maxOffset,
                        clusterAssignment,
                        delay, protocolVersion);

        log.debug("Actual assignments: {}", assignments);
        return serializeAssignments(assignments, protocolVersion);
    }

    // Visible for testing
    ClusterAssignment performTaskAssignment(
            ClusterConfigState configSnapshot,
            int lastCompletedGenerationId,
            int currentGenerationId,
            Map<String, ConnectorsAndTasks> memberAssignments
    ) {
        // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
        // can be used to calculate derived sets
        log.debug("Previous assignments: {}", previousAssignment);
        if (previousGenerationId != lastCompletedGenerationId) {
            log.debug("Clearing the view of previous assignments due to generation mismatch between "
                    + "previous generation ID {} and last completed generation ID {}. This can "
                    + "happen if the leader fails to sync the assignment within a rebalancing round. "
                    + "The following view of previous assignments might be outdated and will be "
                    + "ignored by the leader in the current computation of new assignments. "
                    + "Possibly outdated previous assignments: {}",
                    previousGenerationId, lastCompletedGenerationId, previousAssignment);
            this.previousAssignment = ConnectorsAndTasks.EMPTY;
        }

        Set<String> configuredConnectors = new TreeSet<>(configSnapshot.connectors());
        Set<ConnectorTaskId> configuredTasks = combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());

        // Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
        // be used to calculate derived sets
        ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
                .with(configuredConnectors, configuredTasks).build();
        log.debug("Configured assignments: {}", configured);

        // Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
        // used to calculate derived sets
        ConnectorsAndTasks activeAssignments = assignment(memberAssignments);
        log.debug("Active assignments: {}", activeAssignments);

        // This means that a previous revocation did not take effect. In this case, reset
        // appropriately and be ready to re-apply revocation of tasks
        if (!previousRevocation.isEmpty()) {
            if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
                    || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
                previousAssignment = activeAssignments;
                canRevoke = true;
            }
            previousRevocation.connectors().clear();
            previousRevocation.tasks().clear();
        }

        // Derived set: The set of deleted connectors-and-tasks is a derived set from the set
        // difference of previous - configured
        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
        log.debug("Deleted assignments: {}", deleted);

        // Derived set: The set of remaining active connectors-and-tasks is a derived set from the
        // set difference of active - deleted
        ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
        log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);

        // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
        // the set difference of previous - active - deleted
        ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
        log.debug("Lost assignments: {}", lostAssignments);

        // Derived set: The set of new connectors-and-tasks is a derived set from the set
        // difference of configured - previous - active
        ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
        log.debug("New assignments: {}", newSubmissions);

        // A collection of the complete assignment
        List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberAssignments, ConnectorsAndTasks.EMPTY);
        log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);

        // Per worker connector assignments without removing deleted connectors yet
        Map<String, Collection<String>> connectorAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);

        // Per worker task assignments without removing deleted connectors yet
        Map<String, Collection<ConnectorTaskId>> taskAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
        log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);

        // A collection of the current assignment excluding the connectors-and-tasks to be deleted
        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);

        Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
        log.debug("Connector and task to delete assignments: {}", toRevoke);

        // Revoking redundant connectors/tasks if the workers have duplicate assignments
        toRevoke.putAll(computeDuplicatedAssignments(memberAssignments, connectorAssignments, taskAssignments));
        log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);

        // Recompute the complete assignment excluding the deleted connectors-and-tasks
        completeWorkerAssignment = workerAssignment(memberAssignments, deleted);
        connectorAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        taskAssignments =
                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));

        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);

        // Do not revoke resources for re-assignment while a delayed rebalance is active
        // Also we do not revoke in two consecutive rebalances by the same leader
        canRevoke = delay == 0 && canRevoke;

        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
        // account the deleted ones.
        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
        if (canRevoke) {
            Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
                    performTaskRevocation(activeAssignments, currentWorkerAssignment);

            log.debug("Connector and task to revoke assignments: {}", toRevoke);

            toExplicitlyRevoke.forEach(
                (worker, assignment) -> {
                    ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
                        worker,
                        v -> new ConnectorsAndTasks.Builder().build());
                    existing.connectors().addAll(assignment.connectors());
                    existing.tasks().addAll(assignment.tasks());
                }
            );
            canRevoke = toExplicitlyRevoke.size() == 0;
        } else {
            canRevoke = delay == 0;
        }

        assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
        assignTasks(completeWorkerAssignment, newSubmissions.tasks());
        log.debug("Current complete assignments: {}", currentWorkerAssignment);
        log.debug("New complete assignments: {}", completeWorkerAssignment);

        Map<String, Collection<String>> currentConnectorAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
        Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
        Map<String, Collection<String>> incrementalConnectorAssignments =
                diff(connectorAssignments, currentConnectorAssignments);
        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                diff(taskAssignments, currentTaskAssignments);

        previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
        previousGenerationId = currentGenerationId;
        previousMembers = memberAssignments.keySet();

        log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
        log.debug("Incremental task assignments: {}", incrementalTaskAssignments);

        Map<String, Collection<String>> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors);
        Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks);

        return new ClusterAssignment(
                incrementalConnectorAssignments,
                incrementalTaskAssignments,
                revokedConnectors,
                revokedTasks,
                diff(connectorAssignments, revokedConnectors),
                diff(taskAssignments, revokedTasks)
        );
    }

    private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
                                                           Map<String, Collection<String>> connectorAssignments,
                                                           Map<String, Collection<ConnectorTaskId>> taskAssignments) {
        // Connector to worker reverse lookup map
        Map<String, String> connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignments);
        // Task to worker reverse lookup map
        Map<ConnectorTaskId, String> taskOwners = WorkerCoordinator.invertAssignment(taskAssignments);

        Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
        // Add the connectors that have been deleted to the revoked set
        deleted.connectors().forEach(c ->
                toRevoke.computeIfAbsent(
                    connectorOwners.get(c),
                    v -> new ConnectorsAndTasks.Builder().build()
                ).connectors().add(c));
        // Add the tasks that have been deleted to the revoked set
        deleted.tasks().forEach(t ->
                toRevoke.computeIfAbsent(
                    taskOwners.get(t),
                    v -> new ConnectorsAndTasks.Builder().build()
                ).tasks().add(t));
        log.debug("Connectors and tasks to delete assignments: {}", toRevoke);
        return toRevoke;
    }

    private ConnectorsAndTasks computePreviousAssignment(Map<String, ConnectorsAndTasks> toRevoke,
                                                         Map<String, Collection<String>> connectorAssignments,
                                                         Map<String, Collection<ConnectorTaskId>> taskAssignments,
                                                         ConnectorsAndTasks lostAssignments) {
        ConnectorsAndTasks previousAssignment = new ConnectorsAndTasks.Builder().with(
                ConnectUtils.combineCollections(connectorAssignments.values()),
                ConnectUtils.combineCollections(taskAssignments.values())
        ).build();

        for (ConnectorsAndTasks revoked : toRevoke.values()) {
            previousAssignment.connectors().removeAll(revoked.connectors());
            previousAssignment.tasks().removeAll(revoked.tasks());
            previousRevocation.connectors().addAll(revoked.connectors());
            previousRevocation.tasks().addAll(revoked.tasks());
        }

        // Depends on the previous assignment's collections being sets at the moment.
        // TODO: make it independent
        previousAssignment.connectors().addAll(lostAssignments.connectors());
        previousAssignment.tasks().addAll(lostAssignments.tasks());

        return previousAssignment;
    }

    private ConnectorsAndTasks duplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments) {
        Map<String, Long> connectorInstanceCounts = combineCollections(
                memberAssignments.values(),
                ConnectorsAndTasks::connectors,
                Collectors.groupingBy(Function.identity(), Collectors.counting())
        );
        Set<String> duplicatedConnectors = connectorInstanceCounts
                .entrySet().stream()
                .filter(entry -> entry.getValue() > 1L)
                .map(Entry::getKey)
                .collect(Collectors.toSet());

        Map<ConnectorTaskId, Long> taskInstanceCounts = combineCollections(
                memberAssignments.values(),
                ConnectorsAndTasks::tasks,
                Collectors.groupingBy(Function.identity(), Collectors.counting())
        );
        Set<ConnectorTaskId> duplicatedTasks = taskInstanceCounts
                .entrySet().stream()
                .filter(entry -> entry.getValue() > 1L)
                .map(Entry::getKey)
                .collect(Collectors.toSet());

        return new ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
    }

    private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments,
                                             Map<String, Collection<String>> connectorAssignments,
                                             Map<String, Collection<ConnectorTaskId>> taskAssignment) {
        ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberAssignments);
        log.debug("Duplicated assignments: {}", duplicatedAssignments);

        Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
        if (!duplicatedAssignments.connectors().isEmpty()) {
            connectorAssignments.entrySet().stream()
                    .forEach(entry -> {
                        Set<String> duplicatedConnectors = new HashSet<>(duplicatedAssignments.connectors());
                        duplicatedConnectors.retainAll(entry.getValue());
                        if (!duplicatedConnectors.isEmpty()) {
                            toRevoke.computeIfAbsent(
                                entry.getKey(),
                                v -> new ConnectorsAndTasks.Builder().build()
                            ).connectors().addAll(duplicatedConnectors);
                        }
                    });
        }
        if (!duplicatedAssignments.tasks().isEmpty()) {
            taskAssignment.entrySet().stream()
                    .forEach(entry -> {
                        Set<ConnectorTaskId> duplicatedTasks = new HashSet<>(duplicatedAssignments.tasks());
                        duplicatedTasks.retainAll(entry.getValue());
                        if (!duplicatedTasks.isEmpty()) {
                            toRevoke.computeIfAbsent(
                                entry.getKey(),
                                v -> new ConnectorsAndTasks.Builder().build()
                            ).tasks().addAll(duplicatedTasks);
                        }
                    });
        }
        return toRevoke;
    }

    // visible for testing
    protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                         ConnectorsAndTasks newSubmissions,
                                         List<WorkerLoad> completeWorkerAssignment) {
        if (lostAssignments.isEmpty()) {
            resetDelay();
            return;
        }

        final long now = time.milliseconds();
        log.debug("Found the following connectors and tasks missing from previous assignments: "
                + lostAssignments);

        Set<String> activeMembers = completeWorkerAssignment.stream()
                .map(WorkerLoad::worker)
                .collect(Collectors.toSet());
        if (scheduledRebalance <= 0 && activeMembers.containsAll(previousMembers)) {
            log.debug("No worker seems to have departed the group during the rebalance. The "
                    + "missing assignments that the leader is detecting are probably due to some "
                    + "workers failing to receive the new assignments in the previous rebalance. "
                    + "Will reassign missing tasks as new tasks");
            newSubmissions.connectors().addAll(lostAssignments.connectors());
            newSubmissions.tasks().addAll(lostAssignments.tasks());
            return;
        }

        if (scheduledRebalance > 0 && now >= scheduledRebalance) {
            // delayed rebalance expired and it's time to assign resources
            log.debug("Delayed rebalance expired. Reassigning lost tasks");
            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
            if (!candidateWorkersForReassignment.isEmpty()) {
                candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
            }

            if (!candidateWorkerLoad.isEmpty()) {
                log.debug("Assigning lost tasks to {} candidate workers: {}", 
                        candidateWorkerLoad.size(),
                        candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
                Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (String connector : lostAssignments.connectors()) {
                    // Loop over the candidate workers as many times as it takes
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    WorkerLoad worker = candidateWorkerIterator.next();
                    log.debug("Assigning connector id {} to member {}", connector, worker.worker());
                    worker.assign(connector);
                }
                candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (ConnectorTaskId task : lostAssignments.tasks()) {
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    WorkerLoad worker = candidateWorkerIterator.next();
                    log.debug("Assigning task id {} to member {}", task, worker.worker());
                    worker.assign(task);
                }
            } else {
                log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
                newSubmissions.connectors().addAll(lostAssignments.connectors());
                newSubmissions.tasks().addAll(lostAssignments.tasks());
            }
            resetDelay();
        } else {
            candidateWorkersForReassignment
                    .addAll(candidateWorkersForReassignment(completeWorkerAssignment));
            if (now < scheduledRebalance) {
                // a delayed rebalance is in progress, but it's not yet time to reassign
                // unaccounted resources
                delay = calculateDelay(now);
                log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", delay);
            } else {
                // This means scheduledRebalance == 0
                // We could also extract the current minimum delay from the group, to make
                // independent of consecutive leader failures, but this optimization is skipped
                // at the moment
                delay = maxDelay;
                log.debug("Resetting rebalance delay to the max: {}. scheduledRebalance: {} now: {} diff scheduledRebalance - now: {}",
                        delay, scheduledRebalance, now, scheduledRebalance - now);
            }
            scheduledRebalance = now + delay;
        }
    }

    private void resetDelay() {
        candidateWorkersForReassignment.clear();
        scheduledRebalance = 0;
        if (delay != 0) {
            log.debug("Resetting delay from previous value: {} to 0", delay);
        }
        delay = 0;
    }

    private Set<String> candidateWorkersForReassignment(List<WorkerLoad> completeWorkerAssignment) {
        return completeWorkerAssignment.stream()
                .filter(WorkerLoad::isEmpty)
                .map(WorkerLoad::worker)
                .collect(Collectors.toSet());
    }

    private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
        Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
                .collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
        return candidateWorkersForReassignment.stream()
                .map(activeWorkers::get)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    /**
     * Task revocation is based on a rough estimation of the lower average number of tasks before
     * and after new workers join the group. If no new workers join, no revocation takes place.
     * Based on this estimation, tasks are revoked until the new floor average is reached for
     * each existing worker. The revoked tasks, once assigned to the new workers will maintain
     * a balanced load among the group.
     *
     * @param activeAssignments
     * @param completeWorkerAssignment
     * @return
     */
    private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks activeAssignments,
                                                                  Collection<WorkerLoad> completeWorkerAssignment) {
        int totalActiveConnectorsNum = activeAssignments.connectors().size();
        int totalActiveTasksNum = activeAssignments.tasks().size();
        Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
                .filter(wl -> wl.size() > 0)
                .collect(Collectors.toList());
        int existingWorkersNum = existingWorkers.size();
        int totalWorkersNum = completeWorkerAssignment.size();
        int newWorkersNum = totalWorkersNum - existingWorkersNum;

        if (log.isDebugEnabled()) {
            completeWorkerAssignment.forEach(wl -> log.debug(
                    "Per worker current load size; worker: {} connectors: {} tasks: {}",
                    wl.worker(), wl.connectorsSize(), wl.tasksSize()));
        }

        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
        // If there are no new workers, or no existing workers to revoke tasks from return early
        // after logging the status
        if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
            log.debug("No task revocation required; workers with existing load: {} workers with "
                    + "no load {} total workers {}",
                    existingWorkersNum, newWorkersNum, totalWorkersNum);
            // This is intentionally empty but mutable, because the map is used to include deleted
            // connectors and tasks as well
            return revoking;
        }

        log.debug("Task revocation is required; workers with existing load: {} workers with "
                + "no load {} total workers {}",
                existingWorkersNum, newWorkersNum, totalWorkersNum);

        // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
        log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);


        log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
        int floorTasks = totalActiveTasksNum / totalWorkersNum;
        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
        int numToRevoke;

        for (WorkerLoad existing : existingWorkers) {
            Iterator<String> connectors = existing.connectors().iterator();
            numToRevoke = existing.connectorsSize() - ceilConnectors;
            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
                ConnectorsAndTasks resources = revoking.computeIfAbsent(
                    existing.worker(),
                    w -> new ConnectorsAndTasks.Builder().build());
                resources.connectors().add(connectors.next());
            }
        }

        for (WorkerLoad existing : existingWorkers) {
            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
            numToRevoke = existing.tasksSize() - ceilTasks;
            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
                ConnectorsAndTasks resources = revoking.computeIfAbsent(
                    existing.worker(),
                    w -> new ConnectorsAndTasks.Builder().build());
                resources.tasks().add(tasks.next());
            }
        }

        return revoking;
    }

    private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error,
                                                            String leaderId, String leaderUrl, long maxOffset,
                                                            ClusterAssignment clusterAssignment,
                                                            int delay, short protocolVersion) {
        Map<String, ExtendedAssignment> groupAssignment = new HashMap<>();
        for (String member : members) {
            Collection<String> connectorsToStart = clusterAssignment.newlyAssignedConnectors(member);
            Collection<ConnectorTaskId> tasksToStart = clusterAssignment.newlyAssignedTasks(member);
            Collection<String> connectorsToStop = clusterAssignment.newlyRevokedConnectors(member);
            Collection<ConnectorTaskId> tasksToStop = clusterAssignment.newlyRevokedTasks(member);
            ExtendedAssignment assignment =
                    new ExtendedAssignment(protocolVersion, error, leaderId, leaderUrl, maxOffset,
                            connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
            log.debug("Filling assignment: {} -> {}", member, assignment);
            groupAssignment.put(member, assignment);
        }
        log.debug("Finished assignment");
        return groupAssignment;
    }

    /**
     * From a map of workers to assignment object generate the equivalent map of workers to byte
     * buffers of serialized assignments.
     *
     * @param assignments the map of worker assignments
     * @return the serialized map of assignments to workers
     */
    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments, short protocolVersion) {
        boolean sessioned = protocolVersion >= CONNECT_PROTOCOL_V2;
        return assignments.entrySet()
                .stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue(), sessioned)));
    }

    private static ConnectorsAndTasks diff(ConnectorsAndTasks base,
                                           ConnectorsAndTasks... toSubtract) {
        Collection<String> connectors = new TreeSet<>(base.connectors());
        Collection<ConnectorTaskId> tasks = new TreeSet<>(base.tasks());
        for (ConnectorsAndTasks sub : toSubtract) {
            connectors.removeAll(sub.connectors());
            tasks.removeAll(sub.tasks());
        }
        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
    }

    private static <T> Map<String, Collection<T>> diff(Map<String, Collection<T>> base,
                                                       Map<String, Collection<T>> toSubtract) {
        Map<String, Collection<T>> incremental = new HashMap<>();
        for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
            List<T> values = new ArrayList<>(entry.getValue());
            values.removeAll(toSubtract.getOrDefault(entry.getKey(), Collections.emptySet()));
            incremental.put(entry.getKey(), values);
        }
        return incremental;
    }

    private ConnectorsAndTasks assignment(Map<String, ConnectorsAndTasks> memberAssignments) {
        log.debug("Received assignments: {}", memberAssignments);
        return new ConnectorsAndTasks.Builder().with(
                ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors),
                ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks)
        ).build();
    }

    private int calculateDelay(long now) {
        long diff = scheduledRebalance - now;
        return diff > 0 ? (int) Math.min(diff, maxDelay) : 0;
    }

    /**
     * Perform a round-robin assignment of connectors to workers with existing worker load. This
     * assignment tries to balance the load between workers, by assigning connectors to workers
     * that have equal load, starting with the least loaded workers.
     *
     * @param workerAssignment the current worker assignment; assigned connectors are added to this list
     * @param connectors the connectors to be assigned
     */
    protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<String> connectors) {
        workerAssignment.sort(WorkerLoad.connectorComparator());
        WorkerLoad first = workerAssignment.get(0);

        Iterator<String> load = connectors.iterator();
        while (load.hasNext()) {
            int firstLoad = first.connectorsSize();
            int upTo = IntStream.range(0, workerAssignment.size())
                    .filter(i -> workerAssignment.get(i).connectorsSize() > firstLoad)
                    .findFirst()
                    .orElse(workerAssignment.size());
            for (WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                String connector = load.next();
                log.debug("Assigning connector {} to {}", connector, worker.worker());
                worker.assign(connector);
                if (!load.hasNext()) {
                    break;
                }
            }
        }
    }

    /**
     * Perform a round-robin assignment of tasks to workers with existing worker load. This
     * assignment tries to balance the load between workers, by assigning tasks to workers that
     * have equal load, starting with the least loaded workers.
     *
     * @param workerAssignment the current worker assignment; assigned tasks are added to this list
     * @param tasks the tasks to be assigned
     */
    protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
        workerAssignment.sort(WorkerLoad.taskComparator());
        WorkerLoad first = workerAssignment.get(0);

        Iterator<ConnectorTaskId> load = tasks.iterator();
        while (load.hasNext()) {
            int firstLoad = first.tasksSize();
            int upTo = IntStream.range(0, workerAssignment.size())
                    .filter(i -> workerAssignment.get(i).tasksSize() > firstLoad)
                    .findFirst()
                    .orElse(workerAssignment.size());
            for (WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                ConnectorTaskId task = load.next();
                log.debug("Assigning task {} to {}", task, worker.worker());
                worker.assign(task);
                if (!load.hasNext()) {
                    break;
                }
            }
        }
    }

    private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks> memberAssignments,
                                                     ConnectorsAndTasks toExclude) {
        ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
                .with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks()))
                .build();

        return memberAssignments.entrySet().stream()
                .map(e -> new WorkerLoad.Builder(e.getKey()).with(
                        e.getValue().connectors().stream()
                                .filter(v -> !ignore.connectors().contains(v))
                                .collect(Collectors.toList()),
                        e.getValue().tasks().stream()
                                .filter(v -> !ignore.tasks().contains(v))
                                .collect(Collectors.toList())
                        ).build()
                ).collect(Collectors.toList());
    }

    static class ClusterAssignment {

        private final Map<String, Collection<String>> newlyAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks;
        private final Map<String, Collection<String>> newlyRevokedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks;
        private final Map<String, Collection<String>> allAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> allAssignedTasks;
        private final Set<String> allWorkers;

        public static final ClusterAssignment EMPTY = new ClusterAssignment(
                Collections.emptyMap(),
                Collections.emptyMap(),
                Collections.emptyMap(),
                Collections.emptyMap(),
                Collections.emptyMap(),
                Collections.emptyMap()
        );

        public ClusterAssignment(
                Map<String, Collection<String>> newlyAssignedConnectors,
                Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks,
                Map<String, Collection<String>> newlyRevokedConnectors,
                Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks,
                Map<String, Collection<String>> allAssignedConnectors,
                Map<String, Collection<ConnectorTaskId>> allAssignedTasks
        ) {
            this.newlyAssignedConnectors = newlyAssignedConnectors;
            this.newlyAssignedTasks = newlyAssignedTasks;
            this.newlyRevokedConnectors = newlyRevokedConnectors;
            this.newlyRevokedTasks = newlyRevokedTasks;
            this.allAssignedConnectors = allAssignedConnectors;
            this.allAssignedTasks = allAssignedTasks;
            this.allWorkers = combineCollections(
                    Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, allAssignedTasks),
                    Map::keySet,
                    Collectors.toSet()
            );
        }

        public Map<String, Collection<String>> newlyAssignedConnectors() {
            return newlyAssignedConnectors;
        }

        public Collection<String> newlyAssignedConnectors(String worker) {
            return newlyAssignedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
            return newlyAssignedTasks;
        }

        public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
            return newlyAssignedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> newlyRevokedConnectors() {
            return newlyRevokedConnectors;
        }

        public Collection<String> newlyRevokedConnectors(String worker) {
            return newlyRevokedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
            return newlyRevokedTasks;
        }

        public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
            return newlyRevokedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> allAssignedConnectors() {
            return allAssignedConnectors;
        }

        public Map<String, Collection<ConnectorTaskId>> allAssignedTasks() {
            return allAssignedTasks;
        }

        public Set<String> allWorkers() {
            return allWorkers;
        }

        @Override
        public String toString() {
            return "ClusterAssignment{"
                    + "newlyAssignedConnectors=" + newlyAssignedConnectors
                    + ", newlyAssignedTasks=" + newlyAssignedTasks
                    + ", newlyRevokedConnectors=" + newlyRevokedConnectors
                    + ", newlyRevokedTasks=" + newlyRevokedTasks
                    + ", allAssignedConnectors=" + allAssignedConnectors
                    + ", allAssignedTasks=" + allAssignedTasks
                    + ", allWorkers=" + allWorkers
                    + '}';
        }
    }

}

相关信息

kafka 源码目录

相关文章

kafka ConnectAssignor 源码

kafka ConnectProtocol 源码

kafka ConnectProtocolCompatibility 源码

kafka DistributedConfig 源码

kafka DistributedHerder 源码

kafka EagerAssignor 源码

kafka ExtendedAssignment 源码

kafka ExtendedWorkerState 源码

kafka IncrementalCooperativeConnectProtocol 源码

kafka NotAssignedException 源码

0  赞