kafka InternalTopologyBuilder 源码

  • 2022-10-20
  • 浏览 (207)

kafka InternalTopologyBuilder 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;

public class InternalTopologyBuilder {

    public InternalTopologyBuilder() {
        this.topologyName = null;
    }

    public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
        this.topologyConfigs = topologyConfigs;
        this.topologyName = topologyConfigs.topologyName;
    }

    private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
    private static final String[] NO_PREDECESSORS = {};

    // node factories in a topological order
    private final Map<String, NodeFactory<?, ?, ?, ?>> nodeFactories = new LinkedHashMap<>();

    private final Map<String, StateStoreFactory<?>> stateFactories = new HashMap<>();

    private final Map<String, StoreBuilder<?>> globalStateBuilders = new LinkedHashMap<>();

    // built global state stores
    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();

    // Raw names of all source topics, without the application id/named topology prefix for repartition sources
    private final Set<String> rawSourceTopicNames = new HashSet<>();

    // Full names of all source topics, including the application id/named topology prefix for repartition sources
    private List<String> fullSourceTopicNames = null;

    // String representing pattern that matches all subscribed topics, including patterns and full source topic names
    private String sourceTopicPatternString = null;

    // all internal topics with their corresponding properties auto-created by the topology builder and used in source / sink processors
    private final Map<String, InternalTopicProperties> internalTopicNamesWithProperties = new HashMap<>();

    // groups of source processors that need to be copartitioned
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();

    // map from source processor names to subscribed topics (without application-id prefix for internal topics)
    private final Map<String, List<String>> nodeToSourceTopics = new HashMap<>();

    // map from source processor names to regex subscription patterns
    private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();

    // map from sink processor names to sink topic (without application-id prefix for internal topics)
    private final Map<String, String> nodeToSinkTopic = new HashMap<>();

    // map from state store names to raw name (without application id/topology name prefix) of all topics subscribed
    // from source processors that are connected to these state stores
    private final Map<String, Set<String>> stateStoreNameToRawSourceTopicNames = new HashMap<>();

    // map from state store names to all the regex subscribed topics from source processors that
    // are connected to these state stores
    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();

    // map from state store names to this state store's corresponding changelog topic if possible
    private final Map<String, String> storeToChangelogTopic = new HashMap<>();

    // map from changelog topic name to its corresponding state store.
    private final Map<String, String> changelogTopicToStore = new HashMap<>();

    // all global topics
    private final Set<String> globalTopics = new HashSet<>();

    private final Set<String> earliestResetTopics = new HashSet<>();

    private final Set<String> latestResetTopics = new HashSet<>();

    private final Set<Pattern> earliestResetPatterns = new HashSet<>();

    private final Set<Pattern> latestResetPatterns = new HashSet<>();

    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();

    // Used to capture subscribed topics via Patterns discovered during the partition assignment process.
    private final Set<String> subscriptionUpdates = new HashSet<>();

    private String applicationId = null;

    private Map<Integer, Set<String>> nodeGroups = null;

    // The name of the topology this builder belongs to, or null if this is not a NamedTopology
    private final String topologyName;
    // TODO KAFKA-13336: we can remove this referance once we make the Topology/NamedTopology class into an interface and implement it
    private NamedTopology namedTopology;

    // TODO KAFKA-13283: once we enforce all configs be passed in when constructing the topology builder then we can set
    //  this up front and make it final, but for now we have to wait for the global app configs passed in to rewriteTopology
    private TopologyConfig topologyConfigs;  // the configs for this topology, including overrides and global defaults

    private boolean hasPersistentStores = false;

    public static class StateStoreFactory<S extends StateStore> {
        private final StoreBuilder<S> builder;
        private final Set<String> users = new HashSet<>();

        private StateStoreFactory(final StoreBuilder<S> builder) {
            this.builder = builder;
        }

        public S build() {
            return builder.build();
        }

        long retentionPeriod() {
            if (builder instanceof WindowStoreBuilder) {
                return ((WindowStoreBuilder<?, ?>) builder).retentionPeriod();
            } else if (builder instanceof TimestampedWindowStoreBuilder) {
                return ((TimestampedWindowStoreBuilder<?, ?>) builder).retentionPeriod();
            } else if (builder instanceof SessionStoreBuilder) {
                return ((SessionStoreBuilder<?, ?>) builder).retentionPeriod();
            } else {
                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
            }
        }

        private Set<String> users() {
            return users;
        }

        public boolean loggingEnabled() {
            return builder.loggingEnabled();
        }

        private String name() {
            return builder.name();
        }

        private boolean isWindowStore() {
            return builder instanceof WindowStoreBuilder
                || builder instanceof TimestampedWindowStoreBuilder
                || builder instanceof SessionStoreBuilder;
        }

        // Apparently Java strips the generics from this method because we're using the raw type for builder,
        // even though this method doesn't use builder's (missing) type parameter. Our usage seems obviously
        // correct, though, hence the suppression.
        private Map<String, String> logConfig() {
            return builder.logConfig();
        }
    }

    private static abstract class NodeFactory<KIn, VIn, KOut, VOut> {
        final String name;
        final String[] predecessors;

        NodeFactory(final String name,
                    final String[] predecessors) {
            this.name = name;
            this.predecessors = predecessors;
        }

        public abstract ProcessorNode<KIn, VIn, KOut, VOut> build();

        abstract AbstractNode describe();
    }

    private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
        private final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier;
        private final Set<String> stateStoreNames = new HashSet<>();

        ProcessorNodeFactory(final String name,
                             final String[] predecessors,
                             final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier) {
            super(name, predecessors.clone());
            this.supplier = supplier;
        }

        public void addStateStore(final String stateStoreName) {
            stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(name, new HashSet<>(stateStoreNames));
        }
    }

    private static class FixedKeyProcessorNodeFactory<KIn, VIn, VOut> extends ProcessorNodeFactory<KIn, VIn, KIn, VOut> {
        private final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier;
        private final Set<String> stateStoreNames = new HashSet<>();

        FixedKeyProcessorNodeFactory(final String name,
                             final String[] predecessors,
                             final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier) {
            super(name, predecessors.clone(), null);
            this.supplier = supplier;
        }

        public void addStateStore(final String stateStoreName) {
            stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode<KIn, VIn, KIn, VOut> build() {
            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(name, new HashSet<>(stateStoreNames));
        }
    }

    // Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
    // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
    private final Map<String, Pattern> topicToPatterns = new HashMap<>();

    private class SourceNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, KIn, VIn> {
        private final List<String> topics;
        private final Pattern pattern;
        private final Deserializer<KIn> keyDeserializer;
        private final Deserializer<VIn> valDeserializer;
        private final TimestampExtractor timestampExtractor;

        private SourceNodeFactory(final String name,
                                  final String[] topics,
                                  final Pattern pattern,
                                  final TimestampExtractor timestampExtractor,
                                  final Deserializer<KIn> keyDeserializer,
                                  final Deserializer<VIn> valDeserializer) {
            super(name, NO_PREDECESSORS);
            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
            this.pattern = pattern;
            this.keyDeserializer = keyDeserializer;
            this.valDeserializer = valDeserializer;
            this.timestampExtractor = timestampExtractor;
        }

        List<String> getTopics(final Collection<String> subscribedTopics) {
            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
            if (subscribedTopics.isEmpty()) {
                return Collections.singletonList(String.valueOf(pattern));
            }

            final List<String> matchedTopics = new ArrayList<>();
            for (final String update : subscribedTopics) {
                if (pattern == topicToPatterns.get(update)) {
                    matchedTopics.add(update);
                } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
                    // the same topic cannot be matched to more than one pattern
                    // TODO: we should lift this requirement in the future
                    throw new TopologyException("Topic " + update +
                        " is already matched for another regex pattern " + topicToPatterns.get(update) +
                        " and hence cannot be matched to this regex pattern " + pattern + " any more.");
                } else if (isMatch(update)) {
                    topicToPatterns.put(update, pattern);
                    matchedTopics.add(update);
                }
            }
            return matchedTopics;
        }

        @Override
        public ProcessorNode<KIn, VIn, KIn, VIn> build() {
            return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);
        }

        private boolean isMatch(final String topic) {
            return pattern.matcher(topic).matches();
        }

        @Override
        Source describe() {
            return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics), pattern);
        }
    }

    private class SinkNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, Void, Void> {
        private final Serializer<KIn> keySerializer;
        private final Serializer<VIn> valSerializer;
        private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
        private final TopicNameExtractor<KIn, VIn> topicExtractor;

        private SinkNodeFactory(final String name,
                                final String[] predecessors,
                                final TopicNameExtractor<KIn, VIn> topicExtractor,
                                final Serializer<KIn> keySerializer,
                                final Serializer<VIn> valSerializer,
                                final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
            super(name, predecessors.clone());
            this.topicExtractor = topicExtractor;
            this.keySerializer = keySerializer;
            this.valSerializer = valSerializer;
            this.partitioner = partitioner;
        }

        @Override
        public ProcessorNode<KIn, VIn, Void, Void> build() {
            if (topicExtractor instanceof StaticTopicNameExtractor) {
                final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName;
                if (internalTopicNamesWithProperties.containsKey(topic)) {
                    // prefix the internal topic name with the application id
                    return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
                } else {
                    return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
                }
            } else {
                return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
            }
        }

        @Override
        Sink<KIn, VIn> describe() {
            return new Sink<>(name, topicExtractor);
        }
    }

    // public for testing only
    public final InternalTopologyBuilder setApplicationId(final String applicationId) {
        Objects.requireNonNull(applicationId, "applicationId can't be null");
        this.applicationId = applicationId;

        return this;
    }

    public synchronized final void setStreamsConfig(final StreamsConfig applicationConfig) {
        Objects.requireNonNull(applicationConfig, "config can't be null");
        topologyConfigs = new TopologyConfig(applicationConfig);
    }

    public synchronized  final void setNamedTopology(final NamedTopology namedTopology) {
        this.namedTopology = namedTopology;
    }

    public synchronized TopologyConfig topologyConfigs() {
        return topologyConfigs;
    }

    public String topologyName() {
        return topologyName;
    }

    public NamedTopology namedTopology() {
        return namedTopology;
    }

    public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) {
        Objects.requireNonNull(config, "config can't be null");

        setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
        setStreamsConfig(config);

        // maybe strip out caching layers
        if (topologyConfigs.cacheSize == 0L) {
            for (final StateStoreFactory<?> storeFactory : stateFactories.values()) {
                storeFactory.builder.withCachingDisabled();
            }

            for (final StoreBuilder<?> storeBuilder : globalStateBuilders.values()) {
                storeBuilder.withCachingDisabled();
            }
        }

        // build global state stores
        for (final StoreBuilder<?> storeBuilder : globalStateBuilders.values()) {
            globalStateStores.put(storeBuilder.name(), storeBuilder.build());
        }

        return this;
    }

    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                final String name,
                                final TimestampExtractor timestampExtractor,
                                final Deserializer<?> keyDeserializer,
                                final Deserializer<?> valDeserializer,
                                final String... topics) {
        if (topics.length == 0) {
            throw new TopologyException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }

        for (final String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            validateTopicNotAlreadyRegistered(topic);
            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
            rawSourceTopicNames.add(topic);
        }

        nodeFactories.put(name, new SourceNodeFactory<>(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
        nodeToSourceTopics.put(name, Arrays.asList(topics));
        nodeGrouper.add(name);
        nodeGroups = null;
    }

    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                final String name,
                                final TimestampExtractor timestampExtractor,
                                final Deserializer<?> keyDeserializer,
                                final Deserializer<?> valDeserializer,
                                final Pattern topicPattern) {
        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
        Objects.requireNonNull(name, "name can't be null");

        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }

        for (final String sourceTopicName : rawSourceTopicNames) {
            if (topicPattern.matcher(sourceTopicName).matches()) {
                throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
            }
        }

        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);

        nodeFactories.put(name, new SourceNodeFactory<>(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
        nodeToSourcePatterns.put(name, topicPattern);
        nodeGrouper.add(name);
        nodeGroups = null;
    }

    public final <K, V> void addSink(final String name,
                                     final String topic,
                                     final Serializer<K> keySerializer,
                                     final Serializer<V> valSerializer,
                                     final StreamPartitioner<? super K, ? super V> partitioner,
                                     final String... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        if (predecessorNames.length == 0) {
            throw new TopologyException("Sink " + name + " must have at least one parent");
        }

        addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
        nodeToSinkTopic.put(name, topic);
        nodeGroups = null;
    }

    public final <K, V> void addSink(final String name,
                                     final TopicNameExtractor<K, V> topicExtractor,
                                     final Serializer<K> keySerializer,
                                     final Serializer<V> valSerializer,
                                     final StreamPartitioner<? super K, ? super V> partitioner,
                                     final String... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        if (predecessorNames.length == 0) {
            throw new TopologyException("Sink " + name + " must have at least one parent");
        }

        for (final String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "predecessor name can't be null");
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
            }
            if (nodeToSinkTopic.containsKey(predecessor)) {
                throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
            }
        }

        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
        nodeGrouper.add(name);
        nodeGrouper.unite(name, predecessorNames);
        nodeGroups = null;
    }

    public final <KIn, VIn, KOut, VOut> void addProcessor(final String name,
                                                          final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
                                                          final String... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        ApiUtils.checkSupplier(supplier);
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        if (predecessorNames.length == 0) {
            throw new TopologyException("Processor " + name + " must have at least one parent");
        }

        for (final String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "predecessor name must not be null");
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
            }
        }

        nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, supplier));
        nodeGrouper.add(name);
        nodeGrouper.unite(name, predecessorNames);
        nodeGroups = null;
    }

    public final <KIn, VIn, VOut> void addProcessor(final String name,
                                                    final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier,
                                                    final String... predecessorNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
        ApiUtils.checkSupplier(supplier);
        if (nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
        if (predecessorNames.length == 0) {
            throw new TopologyException("Processor " + name + " must have at least one parent");
        }

        for (final String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "predecessor name must not be null");
            if (predecessor.equals(name)) {
                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
            }
            if (!nodeFactories.containsKey(predecessor)) {
                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
            }
        }

        nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, predecessorNames, supplier));
        nodeGrouper.add(name);
        nodeGrouper.unite(name, predecessorNames);
        nodeGroups = null;
    }

    public final void addStateStore(final StoreBuilder<?> storeBuilder,
                                    final String... processorNames) {
        addStateStore(storeBuilder, false, processorNames);
    }

    public final void addStateStore(final StoreBuilder<?> storeBuilder,
                                    final boolean allowOverride,
                                    final String... processorNames) {
        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
        final StateStoreFactory<?> stateFactory = stateFactories.get(storeBuilder.name());
        if (!allowOverride && stateFactory != null && !stateFactory.builder.equals(storeBuilder)) {
            throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name());
        }
        if (globalStateBuilders.containsKey(storeBuilder.name())) {
            throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeBuilder.name());
        }

        stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder));

        if (processorNames != null) {
            for (final String processorName : processorNames) {
                Objects.requireNonNull(processorName, "processor name must not be null");
                connectProcessorAndStateStore(processorName, storeBuilder.name());
            }
        }
        nodeGroups = null;
    }

    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                final String sourceName,
                                                final TimestampExtractor timestampExtractor,
                                                final Deserializer<KIn> keyDeserializer,
                                                final Deserializer<VIn> valueDeserializer,
                                                final String topic,
                                                final String processorName,
                                                final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
        Objects.requireNonNull(storeBuilder, "store builder must not be null");
        ApiUtils.checkSupplier(stateUpdateSupplier);
        validateGlobalStoreArguments(sourceName,
                                     topic,
                                     processorName,
                                     stateUpdateSupplier,
                                     storeBuilder.name(),
                                     storeBuilder.loggingEnabled());
        validateTopicNotAlreadyRegistered(topic);

        final String[] topics = {topic};
        final String[] predecessors = {sourceName};

        final ProcessorNodeFactory<KIn, VIn, Void, Void> nodeFactory = new ProcessorNodeFactory<>(
            processorName,
            predecessors,
            stateUpdateSupplier
        );

        globalTopics.add(topic);
        nodeFactories.put(sourceName, new SourceNodeFactory<>(
            sourceName,
            topics,
            null,
            timestampExtractor,
            keyDeserializer,
            valueDeserializer)
        );
        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
        nodeGrouper.add(sourceName);
        nodeFactory.addStateStore(storeBuilder.name());
        nodeFactories.put(processorName, nodeFactory);
        nodeGrouper.add(processorName);
        nodeGrouper.unite(processorName, predecessors);
        globalStateBuilders.put(storeBuilder.name(), storeBuilder);
        connectSourceStoreAndTopic(storeBuilder.name(), topic);
        nodeGroups = null;
    }

    private void validateTopicNotAlreadyRegistered(final String topic) {
        if (rawSourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
            throw new TopologyException("Topic " + topic + " has already been registered by another source.");
        }

        for (final Pattern pattern : nodeToSourcePatterns.values()) {
            if (pattern.matcher(topic).matches()) {
                throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
            }
        }
    }

    public final void connectProcessorAndStateStores(final String processorName,
                                                     final String... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName can't be null");
        Objects.requireNonNull(stateStoreNames, "state store list must not be null");
        if (stateStoreNames.length == 0) {
            throw new TopologyException("Must provide at least one state store name.");
        }
        for (final String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "state store name must not be null");
            connectProcessorAndStateStore(processorName, stateStoreName);
        }
        nodeGroups = null;
    }

    public String getStoreForChangelogTopic(final String topicName) {
        return changelogTopicToStore.get(topicName);
    }

    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                           final String topic) {
        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        storeToChangelogTopic.put(sourceStoreName, topic);
        changelogTopicToStore.put(topic, sourceStoreName);
    }

    public final void addInternalTopic(final String topicName,
                                       final InternalTopicProperties internalTopicProperties) {
        Objects.requireNonNull(topicName, "topicName can't be null");
        Objects.requireNonNull(internalTopicProperties, "internalTopicProperties can't be null");

        internalTopicNamesWithProperties.put(topicName, internalTopicProperties);
    }

    public final void copartitionSources(final Collection<String> sourceNodes) {
        copartitionSourceGroups.add(new HashSet<>(sourceNodes));
    }

    public final void maybeUpdateCopartitionSourceGroups(final String replacedNodeName,
                                                         final String optimizedNodeName) {
        for (final Set<String> copartitionSourceGroup : copartitionSourceGroups) {
            if (copartitionSourceGroup.contains(replacedNodeName)) {
                copartitionSourceGroup.remove(replacedNodeName);
                copartitionSourceGroup.add(optimizedNodeName);
            }
        }
    }

    public void validateCopartition() {
        // allCopartitionedSourceTopics take the list of co-partitioned nodes and
        // replaces each processor name with the corresponding source topic name
        final List<Set<String>> allCopartitionedSourceTopics =
                copartitionSourceGroups
                        .stream()
                        .map(sourceGroup -> sourceGroup
                                .stream()
                                .flatMap(sourceNodeName -> nodeToSourceTopics.getOrDefault(sourceNodeName,
                                        Collections.emptyList()).stream())
                                .collect(Collectors.toSet())
                        ).collect(Collectors.toList());
        for (final Set<String> copartition : allCopartitionedSourceTopics) {
            final Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>();
            copartition.forEach(topic -> {
                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
                    numberOfPartitionsPerTopic.put(topic, prop.getNumberOfPartitions().get());
                }
            });
            if (!numberOfPartitionsPerTopic.isEmpty() && copartition.equals(numberOfPartitionsPerTopic.keySet())) {
                final Collection<Integer> partitionNumbers = numberOfPartitionsPerTopic.values();
                final Integer first = partitionNumbers.iterator().next();
                for (final Integer partitionNumber : partitionNumbers) {
                    if (!partitionNumber.equals(first)) {
                        final String msg = String.format("Following topics do not have the same number of " +
                                "partitions: [%s]", new TreeMap<>(numberOfPartitionsPerTopic));
                        throw new TopologyException(msg);

                    }
                }
            }
        }
    }

    private void validateGlobalStoreArguments(final String sourceName,
                                              final String topic,
                                              final String processorName,
                                              final ProcessorSupplier<?, ?, Void, Void> stateUpdateSupplier,
                                              final String storeName,
                                              final boolean loggingEnabled) {
        Objects.requireNonNull(sourceName, "sourceName must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
        Objects.requireNonNull(processorName, "processorName must not be null");
        if (nodeFactories.containsKey(sourceName)) {
            throw new TopologyException("Processor " + sourceName + " is already added.");
        }
        if (nodeFactories.containsKey(processorName)) {
            throw new TopologyException("Processor " + processorName + " is already added.");
        }
        if (stateFactories.containsKey(storeName)) {
            throw new TopologyException("A different StateStore has already been added with the name " + storeName);
        }
        if (globalStateBuilders.containsKey(storeName)) {
            throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
        }
        if (loggingEnabled) {
            throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
        }
        if (sourceName.equals(processorName)) {
            throw new TopologyException("sourceName and processorName must be different.");
        }
    }

    private void connectProcessorAndStateStore(final String processorName,
                                               final String stateStoreName) {
        if (globalStateBuilders.containsKey(stateStoreName)) {
            throw new TopologyException("Global StateStore " + stateStoreName +
                    " can be used by a Processor without being specified; it should not be explicitly passed.");
        }
        if (!stateFactories.containsKey(stateStoreName)) {
            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
        }
        if (!nodeFactories.containsKey(processorName)) {
            throw new TopologyException("Processor " + processorName + " is not added yet.");
        }

        final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);
        final Iterator<String> iter = stateStoreFactory.users().iterator();
        if (iter.hasNext()) {
            final String user = iter.next();
            nodeGrouper.unite(user, processorName);
        }
        stateStoreFactory.users().add(processorName);

        final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(processorName);
        if (nodeFactory instanceof ProcessorNodeFactory) {
            final ProcessorNodeFactory<?, ?, ?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory;
            processorNodeFactory.addStateStore(stateStoreName);
            connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
        } else {
            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
        }
    }

    private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
        final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
        for (final String predecessor : predecessors) {
            final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(predecessor);
            if (nodeFactory instanceof SourceNodeFactory) {
                sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
            } else if (nodeFactory instanceof ProcessorNodeFactory) {
                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors));
            }
        }
        return sourceNodes;
    }

    private <KIn, VIn, KOut, VOut> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
                                                                                     final ProcessorNodeFactory<KIn, VIn, KOut, VOut> processorNodeFactory) {
        // we should never update the mapping from state store names to source topics if the store name already exists
        // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
        // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.

        if (stateStoreNameToRawSourceTopicNames.containsKey(stateStoreName)
            || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
            return;
        }

        final Set<String> sourceTopics = new HashSet<>();
        final Set<Pattern> sourcePatterns = new HashSet<>();
        final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
            findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);

        for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
            if (sourceNodeFactory.pattern != null) {
                sourcePatterns.add(sourceNodeFactory.pattern);
            } else {
                sourceTopics.addAll(sourceNodeFactory.topics);
            }
        }

        if (!sourceTopics.isEmpty()) {
            stateStoreNameToRawSourceTopicNames.put(
                stateStoreName,
                Collections.unmodifiableSet(sourceTopics)
            );
        }

        if (!sourcePatterns.isEmpty()) {
            stateStoreNameToSourceRegex.put(
                stateStoreName,
                Collections.unmodifiableSet(sourcePatterns)
            );
        }

    }

    private <T> void maybeAddToResetList(final Collection<T> earliestResets,
                                         final Collection<T> latestResets,
                                         final Topology.AutoOffsetReset offsetReset,
                                         final T item) {
        if (offsetReset != null) {
            switch (offsetReset) {
                case EARLIEST:
                    earliestResets.add(item);
                    break;
                case LATEST:
                    latestResets.add(item);
                    break;
                default:
                    throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
            }
        }
    }

    public synchronized Map<Integer, Set<String>> nodeGroups() {
        if (nodeGroups == null) {
            nodeGroups = makeNodeGroups();
        }
        return nodeGroups;
    }

    // Order node groups by their position in the actual topology, ie upstream subtopologies come before downstream
    private Map<Integer, Set<String>> makeNodeGroups() {
        final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
        final Map<String, Set<String>> rootToNodeGroup = new HashMap<>();

        int nodeGroupId = 0;

        // Traverse in topological order
        for (final String nodeName : nodeFactories.keySet()) {
            nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
        }

        return nodeGroups;
    }

    private int putNodeGroupName(final String nodeName,
                                 final int nodeGroupId,
                                 final Map<Integer, Set<String>> nodeGroups,
                                 final Map<String, Set<String>> rootToNodeGroup) {
        int newNodeGroupId = nodeGroupId;
        final String root = nodeGrouper.root(nodeName);
        Set<String> nodeGroup = rootToNodeGroup.get(root);
        if (nodeGroup == null) {
            nodeGroup = new HashSet<>();
            rootToNodeGroup.put(root, nodeGroup);
            nodeGroups.put(newNodeGroupId++, nodeGroup);
        }
        nodeGroup.add(nodeName);
        return newNodeGroupId;
    }

    /**
     * @return the full topology minus any global state
     */
    public synchronized ProcessorTopology buildTopology() {
        final Set<String> nodeGroup = new HashSet<>();
        for (final Set<String> value : nodeGroups().values()) {
            nodeGroup.addAll(value);
        }
        nodeGroup.removeAll(globalNodeGroups());

        initializeSubscription();
        return build(nodeGroup);
    }

    /**
     * @param topicGroupId group of topics corresponding to a single subtopology
     * @return subset of the full topology
     */
    public synchronized ProcessorTopology buildSubtopology(final int topicGroupId) {
        final Set<String> nodeGroup = nodeGroups().get(topicGroupId);
        return build(nodeGroup);
    }

    /**
     * Builds the topology for any global state stores
     * @return ProcessorTopology of global state
     */
    public synchronized ProcessorTopology buildGlobalStateTopology() {
        Objects.requireNonNull(applicationId, "topology has not completed optimization");

        final Set<String> globalGroups = globalNodeGroups();
        if (globalGroups.isEmpty()) {
            return null;
        }
        return build(globalGroups);
    }

    private Set<String> globalNodeGroups() {
        final Set<String> globalGroups = new HashSet<>();
        for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
            final Set<String> nodes = nodeGroup.getValue();
            for (final String node : nodes) {
                if (isGlobalSource(node)) {
                    globalGroups.addAll(nodes);
                }
            }
        }
        return globalGroups;
    }

    @SuppressWarnings("unchecked")
    private ProcessorTopology build(final Set<String> nodeGroup) {
        Objects.requireNonNull(applicationId, "topology has not completed optimization");

        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>();
        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
        final Set<String> repartitionTopics = new HashSet<>();

        // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
        // also make sure the state store map values following the insertion ordering
        for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
                final ProcessorNode<?, ?, ?, ?> node = factory.build();
                processorMap.put(node.name(), node);

                if (factory instanceof ProcessorNodeFactory) {
                    buildProcessorNode(processorMap,
                                       stateStoreMap,
                                       (ProcessorNodeFactory<?, ?, ?, ?>) factory,
                                       (ProcessorNode<Object, Object, Object, Object>) node);

                } else if (factory instanceof SourceNodeFactory) {
                    buildSourceNode(topicSourceMap,
                                    repartitionTopics,
                                    (SourceNodeFactory<?, ?>) factory,
                                    (SourceNode<?, ?>) node);

                } else if (factory instanceof SinkNodeFactory) {
                    buildSinkNode(processorMap,
                                  topicSinkMap,
                                  repartitionTopics,
                                  (SinkNodeFactory<?, ?>) factory,
                                  (SinkNode<?, ?>) node);
                } else {
                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                }
            }
        }

        return new ProcessorTopology(new ArrayList<>(processorMap.values()),
                                     topicSourceMap,
                                     topicSinkMap,
                                     new ArrayList<>(stateStoreMap.values()),
                                     new ArrayList<>(globalStateStores.values()),
                                     storeToChangelogTopic,
                                     repartitionTopics);
    }

    private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
                               final Map<String, SinkNode<?, ?>> topicSinkMap,
                               final Set<String> repartitionTopics,
                               final SinkNodeFactory<?, ?> sinkNodeFactory,
                               final SinkNode<?, ?> node) {
        @SuppressWarnings("unchecked") final ProcessorNode<Object, Object, ?, ?> sinkNode =
            (ProcessorNode<Object, Object, ?, ?>) node;

        for (final String predecessorName : sinkNodeFactory.predecessors) {
            final ProcessorNode<Object, Object, Object, Object> processor = getProcessor(processorMap, predecessorName);
            processor.addChild(sinkNode);
            if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
                final String topic = ((StaticTopicNameExtractor<?, ?>) sinkNodeFactory.topicExtractor).topicName;

                if (internalTopicNamesWithProperties.containsKey(topic)) {
                    // prefix the internal topic name with the application id
                    final String decoratedTopic = decorateTopic(topic);
                    topicSinkMap.put(decoratedTopic, node);
                    repartitionTopics.add(decoratedTopic);
                } else {
                    topicSinkMap.put(topic, node);
                }

            }
        }
    }

    @SuppressWarnings("unchecked")
    private static <KIn, VIn, KOut, VOut> ProcessorNode<KIn, VIn, KOut, VOut> getProcessor(
        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
        final String predecessor) {

        return (ProcessorNode<KIn, VIn, KOut, VOut>) processorMap.get(predecessor);
    }

    private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
                                 final Set<String> repartitionTopics,
                                 final SourceNodeFactory<?, ?> sourceNodeFactory,
                                 final SourceNode<?, ?> node) {

        final List<String> topics = (sourceNodeFactory.pattern != null) ?
            sourceNodeFactory.getTopics(subscriptionUpdates()) :
            sourceNodeFactory.topics;

        for (final String topic : topics) {
            if (internalTopicNamesWithProperties.containsKey(topic)) {
                // prefix the internal topic name with the application id
                final String decoratedTopic = decorateTopic(topic);
                topicSourceMap.put(decoratedTopic, node);
                repartitionTopics.add(decoratedTopic);
            } else {
                topicSourceMap.put(topic, node);
            }
        }
    }

    private void buildProcessorNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
                                    final Map<String, StateStore> stateStoreMap,
                                    final ProcessorNodeFactory<?, ?, ?, ?> factory,
                                    final ProcessorNode<Object, Object, Object, Object> node) {

        for (final String predecessor : factory.predecessors) {
            final ProcessorNode<Object, Object, Object, Object> predecessorNode = getProcessor(processorMap, predecessor);
            predecessorNode.addChild(node);
        }
        for (final String stateStoreName : factory.stateStoreNames) {
            if (!stateStoreMap.containsKey(stateStoreName)) {
                final StateStore store;
                if (stateFactories.containsKey(stateStoreName)) {
                    final StateStoreFactory<?> stateStoreFactory = stateFactories.get(stateStoreName);

                    // remember the changelog topic if this state store is change-logging enabled
                    if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
                        final String prefix = topologyConfigs == null ?
                                applicationId :
                                ProcessorContextUtils.getPrefix(topologyConfigs.applicationConfigs.originals(), applicationId);
                        final String changelogTopic =
                            ProcessorStateManager.storeChangelogTopic(prefix, stateStoreName, topologyName);
                        storeToChangelogTopic.put(stateStoreName, changelogTopic);
                        changelogTopicToStore.put(changelogTopic, stateStoreName);
                    }
                    store = stateStoreFactory.build();
                    stateStoreMap.put(stateStoreName, store);
                } else {
                    store = globalStateStores.get(stateStoreName);
                    stateStoreMap.put(stateStoreName, store);
                }

                if (store.persistent()) {
                    hasPersistentStores = true;
                }
            }
        }
    }

    /**
     * Get any global {@link StateStore}s that are part of the
     * topology
     * @return map containing all global {@link StateStore}s
     */
    public Map<String, StateStore> globalStateStores() {
        Objects.requireNonNull(applicationId, "topology has not completed optimization");

        return Collections.unmodifiableMap(globalStateStores);
    }

    public Set<String> allStateStoreNames() {
        Objects.requireNonNull(applicationId, "topology has not completed optimization");

        final Set<String> allNames = new HashSet<>(stateFactories.keySet());
        allNames.addAll(globalStateStores.keySet());
        return Collections.unmodifiableSet(allNames);
    }

    public boolean hasStore(final String name) {
        return stateFactories.containsKey(name) || globalStateStores.containsKey(name);
    }

    public boolean hasPersistentStores() {
        return hasPersistentStores;
    }

    /**
     * Returns the map of topic groups keyed by the group id.
     * A topic group is a group of topics in the same task.
     *
     * @return groups of topic names
     */
    public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
        final Map<Subtopology, TopicsInfo> topicGroups = new LinkedHashMap<>();

        if (nodeGroups == null) {
            nodeGroups = makeNodeGroups();
        }

        for (final Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
            final Set<String> sinkTopics = new HashSet<>();
            final Set<String> sourceTopics = new HashSet<>();
            final Map<String, InternalTopicConfig> repartitionTopics = new HashMap<>();
            final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
            for (final String node : entry.getValue()) {
                // if the node is a source node, add to the source topics
                final List<String> topics = nodeToSourceTopics.get(node);
                if (topics != null) {
                    // if some of the topics are internal, add them to the internal topics
                    for (final String topic : topics) {
                        // skip global topic as they don't need partition assignment
                        if (globalTopics.contains(topic)) {
                            continue;
                        }
                        if (internalTopicNamesWithProperties.containsKey(topic)) {
                            // prefix the internal topic name with the application id
                            final String internalTopic = decorateTopic(topic);

                            final RepartitionTopicConfig repartitionTopicConfig = buildRepartitionTopicConfig(
                                internalTopic,
                                internalTopicNamesWithProperties.get(topic).getNumberOfPartitions()
                            );

                            repartitionTopics.put(repartitionTopicConfig.name(), repartitionTopicConfig);
                            sourceTopics.add(repartitionTopicConfig.name());
                        } else {
                            sourceTopics.add(topic);
                        }
                    }
                }

                // if the node is a sink node, add to the sink topics
                final String topic = nodeToSinkTopic.get(node);
                if (topic != null) {
                    if (internalTopicNamesWithProperties.containsKey(topic)) {
                        // prefix the change log topic name with the application id
                        sinkTopics.add(decorateTopic(topic));
                    } else {
                        sinkTopics.add(topic);
                    }
                }

                // if the node is connected to a state store whose changelog topics are not predefined,
                // add to the changelog topics
                for (final StateStoreFactory<?> stateFactory : stateFactories.values()) {
                    if (stateFactory.users().contains(node) && storeToChangelogTopic.containsKey(stateFactory.name())) {
                        final String topicName = storeToChangelogTopic.get(stateFactory.name());
                        if (!stateChangelogTopics.containsKey(topicName)) {
                            final InternalTopicConfig internalTopicConfig =
                                createChangelogTopicConfig(stateFactory, topicName);
                            stateChangelogTopics.put(topicName, internalTopicConfig);
                        }
                    }
                }
            }
            if (!sourceTopics.isEmpty()) {
                topicGroups.put(new Subtopology(entry.getKey(), topologyName), new TopicsInfo(
                        Collections.unmodifiableSet(sinkTopics),
                        Collections.unmodifiableSet(sourceTopics),
                        Collections.unmodifiableMap(repartitionTopics),
                        Collections.unmodifiableMap(stateChangelogTopics)));
            }
        }

        return Collections.unmodifiableMap(topicGroups);
    }

    public Map<String, List<String>> nodeToSourceTopics() {
        return Collections.unmodifiableMap(nodeToSourceTopics);
    }

    private RepartitionTopicConfig buildRepartitionTopicConfig(final String internalTopic,
                                                               final Optional<Integer> numberOfPartitions) {
        return numberOfPartitions
            .map(partitions -> new RepartitionTopicConfig(internalTopic,
                                                          Collections.emptyMap(),
                                                          partitions,
                                                          true))
            .orElse(new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
    }

    private void setRegexMatchedTopicsToSourceNodes() {
        if (hasSubscriptionUpdates()) {
            for (final String nodeName : nodeToSourcePatterns.keySet()) {
                final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
                final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
                //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
                nodeToSourceTopics.put(nodeName, sourceTopics);
                rawSourceTopicNames.addAll(sourceTopics);
            }
            log.debug("Updated nodeToSourceTopics: {}", nodeToSourceTopics);
        }
    }

    private void setRegexMatchedTopicToStateStore() {
        if (hasSubscriptionUpdates()) {
            for (final Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
                final Set<String> updatedTopicsForStateStore = new HashSet<>();
                for (final String subscriptionUpdateTopic : subscriptionUpdates()) {
                    for (final Pattern pattern : storePattern.getValue()) {
                        if (pattern.matcher(subscriptionUpdateTopic).matches()) {
                            updatedTopicsForStateStore.add(subscriptionUpdateTopic);
                        }
                    }
                }
                if (!updatedTopicsForStateStore.isEmpty()) {
                    final Collection<String> storeTopics = stateStoreNameToRawSourceTopicNames.get(storePattern.getKey());
                    if (storeTopics != null) {
                        updatedTopicsForStateStore.addAll(storeTopics);
                    }
                    stateStoreNameToRawSourceTopicNames.put(
                        storePattern.getKey(),
                        Collections.unmodifiableSet(updatedTopicsForStateStore));
                }
            }
        }
    }

    private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                  final String name) {
        if (factory.isWindowStore()) {
            final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
            config.setRetentionMs(factory.retentionPeriod());
            return config;
        } else {
            return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
        }
    }

    public boolean hasOffsetResetOverrides() {
        return !(earliestResetTopics.isEmpty() && earliestResetPatterns.isEmpty()
            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
    }

    public OffsetResetStrategy offsetResetStrategy(final String topic) {
        if (maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
            earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) {
            return EARLIEST;
        } else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
            latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) {
            return LATEST;
        } else if (containsTopic(topic)) {
            return NONE;
        } else {
            throw new IllegalStateException(String.format(
                "Unable to lookup offset reset strategy for the following topic as it does not exist in the topology%s: %s",
                hasNamedTopology() ? topologyName : "",
                topic)
            );
        }
    }

    /**
     * @return  map from state store name to full names (including application id/topology name prefix)
     *          of all source topics whose processors are connected to it
     */
    public Map<String, List<String>> stateStoreNameToFullSourceTopicNames() {
        final Map<String, List<String>> results = new HashMap<>();
        for (final Map.Entry<String, Set<String>> entry : stateStoreNameToRawSourceTopicNames.entrySet()) {
            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
        }
        return results;
    }

    /**
     * @return  the full names (including application id/topology name prefix) of all source topics whose
     *          processors are connected to the given state store
     */
    public Collection<String> sourceTopicsForStore(final String storeName) {
        return maybeDecorateInternalSourceTopics(stateStoreNameToRawSourceTopicNames.get(storeName));
    }

    public synchronized Collection<Set<String>> copartitionGroups() {
        // compute transitive closures of copartitionGroups to relieve registering code to know all members
        // of a copartitionGroup at the same time
        final List<Set<String>> copartitionSourceTopics =
            copartitionSourceGroups
                .stream()
                .map(sourceGroup ->
                         sourceGroup
                             .stream()
                             .flatMap(node -> maybeDecorateInternalSourceTopics(nodeToSourceTopics.get(node)).stream())
                             .collect(Collectors.toSet())
                ).collect(Collectors.toList());

        final Map<String, Set<String>> topicsToCopartitionGroup = new LinkedHashMap<>();
        for (final Set<String> topics : copartitionSourceTopics) {
            if (topics != null) {
                Set<String> coparititonGroup = null;
                for (final String topic : topics) {
                    coparititonGroup = topicsToCopartitionGroup.get(topic);
                    if (coparititonGroup != null) {
                        break;
                    }
                }
                if (coparititonGroup == null) {
                    coparititonGroup = new HashSet<>();
                }
                coparititonGroup.addAll(maybeDecorateInternalSourceTopics(topics));
                for (final String topic : topics) {
                    topicsToCopartitionGroup.put(topic, coparititonGroup);
                }
            }
        }
        final Set<Set<String>> uniqueCopartitionGroups = new HashSet<>(topicsToCopartitionGroup.values());
        return Collections.unmodifiableList(new ArrayList<>(uniqueCopartitionGroups));
    }

    private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
        if (sourceTopics == null) {
            return Collections.emptyList();
        }
        final List<String> decoratedTopics = new ArrayList<>();
        for (final String topic : sourceTopics) {
            if (internalTopicNamesWithProperties.containsKey(topic)) {
                decoratedTopics.add(decorateTopic(topic));
            } else {
                decoratedTopics.add(topic);
            }
        }
        return decoratedTopics;
    }

    public String decoratePseudoTopic(final String topic) {
        return decorateTopic(topic);
    }

    private String decorateTopic(final String topic) {
        if (applicationId == null) {
            throw new TopologyException("there are internal topics and "
                                            + "applicationId hasn't been set. Call "
                                            + "setApplicationId first");
        }
        final String prefix = topologyConfigs == null ?
                                applicationId :
                                ProcessorContextUtils.getPrefix(topologyConfigs.applicationConfigs.originals(), applicationId);

        if (hasNamedTopology()) {
            return prefix + "-" + topologyName + "-" + topic;
        } else {
            return prefix + "-" + topic;
        }
    }


    void initializeSubscription() {
        if (usesPatternSubscription()) {
            log.debug("Found pattern subscribed source topics, initializing consumer's subscription pattern.");
            sourceTopicPatternString = buildSourceTopicsPatternString();
        } else {
            log.debug("No source topics using pattern subscription found, initializing consumer's subscription collection.");
            fullSourceTopicNames = maybeDecorateInternalSourceTopics(rawSourceTopicNames);
            Collections.sort(fullSourceTopicNames);
        }
    }

    private String buildSourceTopicsPatternString() {
        final List<String> allSourceTopics = maybeDecorateInternalSourceTopics(rawSourceTopicNames);
        Collections.sort(allSourceTopics);

        final StringBuilder builder = new StringBuilder();

        for (final String topic : allSourceTopics) {
            builder.append(topic).append("|");
        }

        for (final Pattern sourcePattern : nodeToSourcePatterns.values()) {
            builder.append(sourcePattern.pattern()).append("|");
        }

        if (builder.length() > 0) {
            builder.setLength(builder.length() - 1);
        }

        return builder.toString();
    }

    boolean usesPatternSubscription() {
        return !nodeToSourcePatterns.isEmpty();
    }

    /**
     * @return names of all source topics, including the application id/named topology prefix for repartition sources
     */
    public synchronized List<String> fullSourceTopicNames() {
        if (fullSourceTopicNames == null) {
            fullSourceTopicNames = maybeDecorateInternalSourceTopics(rawSourceTopicNames);
            Collections.sort(fullSourceTopicNames);
        }
        return fullSourceTopicNames;
    }

    synchronized String sourceTopicPatternString() {
        // With a NamedTopology, it may be that this topology does not use pattern subscription but another one does
        // in which case we would need to initialize the pattern string where we would otherwise have not
        if (sourceTopicPatternString == null) {
            sourceTopicPatternString = buildSourceTopicsPatternString();
        }
        return sourceTopicPatternString;
    }

    public boolean containsTopic(final String topic) {
        return fullSourceTopicNames().contains(topic)
            || (usesPatternSubscription() && Pattern.compile(sourceTopicPatternString()).matcher(topic).matches())
            || changelogTopicToStore.containsKey(topic);
    }

    public boolean hasNoLocalTopology() {
        return nodeToSourcePatterns.isEmpty() && rawSourceTopicNames.isEmpty();
    }

    public boolean hasGlobalStores() {
        return !globalStateStores.isEmpty();
    }

    private boolean isGlobalSource(final String nodeName) {
        final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(nodeName);

        if (nodeFactory instanceof SourceNodeFactory) {
            final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
            return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
        }
        return false;
    }

    public TopologyDescription describe() {
        final TopologyDescription description = new TopologyDescription(topologyName);

        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {

            final Set<String> allNodesOfGroups = nodeGroup.getValue();
            final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);

            if (!isNodeGroupOfGlobalStores) {
                describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
            } else {
                describeGlobalStore(description, allNodesOfGroups, nodeGroup.getKey());
            }
        }

        return description;
    }

    private void describeGlobalStore(final TopologyDescription description,
                                     final Set<String> nodes,
                                     final int id) {
        final Iterator<String> it = nodes.iterator();
        while (it.hasNext()) {
            final String node = it.next();

            if (isGlobalSource(node)) {
                // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
                it.remove(); // remove sourceNode from group
                final String processorNode = nodes.iterator().next(); // get remaining processorNode

                description.addGlobalStore(new GlobalStore(
                    node,
                    processorNode,
                    ((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
                    nodeToSourceTopics.get(node).get(0),
                    id
                ));
                break;
            }
        }
    }

    private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
        for (final String node : allNodesOfGroups) {
            if (isGlobalSource(node)) {
                return true;
            }
        }
        return false;
    }

    private static class NodeComparator implements Comparator<TopologyDescription.Node>, Serializable {

        @Override
        public int compare(final TopologyDescription.Node node1,
                           final TopologyDescription.Node node2) {
            if (node1.equals(node2)) {
                return 0;
            }
            final int size1 = ((AbstractNode) node1).size;
            final int size2 = ((AbstractNode) node2).size;

            // it is possible that two nodes have the same sub-tree size (think two nodes connected via state stores)
            // in this case default to processor name string
            if (size1 != size2) {
                return size2 - size1;
            } else {
                return node1.name().compareTo(node2.name());
            }
        }
    }

    private final static NodeComparator NODE_COMPARATOR = new NodeComparator();

    private static void updateSize(final AbstractNode node,
                                   final int delta) {
        node.size += delta;

        for (final TopologyDescription.Node predecessor : node.predecessors()) {
            updateSize((AbstractNode) predecessor, delta);
        }
    }

    private void describeSubtopology(final TopologyDescription description,
                                     final Integer subtopologyId,
                                     final Set<String> nodeNames) {

        final Map<String, AbstractNode> nodesByName = new HashMap<>();

        // add all nodes
        for (final String nodeName : nodeNames) {
            nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
        }

        // connect each node to its predecessors and successors
        for (final AbstractNode node : nodesByName.values()) {
            for (final String predecessorName : nodeFactories.get(node.name()).predecessors) {
                final AbstractNode predecessor = nodesByName.get(predecessorName);
                node.addPredecessor(predecessor);
                predecessor.addSuccessor(node);
                updateSize(predecessor, node.size);
            }
        }

        description.addSubtopology(new SubtopologyDescription(
                subtopologyId,
                new HashSet<>(nodesByName.values())));
    }

    public final static class GlobalStore implements TopologyDescription.GlobalStore {
        private final Source source;
        private final Processor processor;
        private final int id;

        public GlobalStore(final String sourceName,
                           final String processorName,
                           final String storeName,
                           final String topicName,
                           final int id) {
            source = new Source(sourceName, Collections.singleton(topicName), null);
            processor = new Processor(processorName, Collections.singleton(storeName));
            source.successors.add(processor);
            processor.predecessors.add(source);
            this.id = id;
        }

        @Override
        public int id() {
            return id;
        }

        @Override
        public TopologyDescription.Source source() {
            return source;
        }

        @Override
        public TopologyDescription.Processor processor() {
            return processor;
        }

        @Override
        public String toString() {
            return "Sub-topology: " + id + " for global store (will not generate tasks)\n"
                    + "    " + source.toString() + "\n"
                    + "    " + processor.toString() + "\n";
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final GlobalStore that = (GlobalStore) o;
            return source.equals(that.source)
                && processor.equals(that.processor);
        }

        @Override
        public int hashCode() {
            return Objects.hash(source, processor);
        }
    }

    public abstract static class AbstractNode implements TopologyDescription.Node {
        final String name;
        final Set<TopologyDescription.Node> predecessors = new TreeSet<>(NODE_COMPARATOR);
        final Set<TopologyDescription.Node> successors = new TreeSet<>(NODE_COMPARATOR);

        // size of the sub-topology rooted at this node, including the node itself
        int size;

        AbstractNode(final String name) {
            Objects.requireNonNull(name, "name cannot be null");
            this.name = name;
            this.size = 1;
        }

        @Override
        public String name() {
            return name;
        }

        @Override
        public Set<TopologyDescription.Node> predecessors() {
            return Collections.unmodifiableSet(predecessors);
        }

        @Override
        public Set<TopologyDescription.Node> successors() {
            return Collections.unmodifiableSet(successors);
        }

        public void addPredecessor(final TopologyDescription.Node predecessor) {
            predecessors.add(predecessor);
        }

        public void addSuccessor(final TopologyDescription.Node successor) {
            successors.add(successor);
        }
    }

    public final static class Source extends AbstractNode implements TopologyDescription.Source {
        private final Set<String> topics;
        private final Pattern topicPattern;

        public Source(final String name,
                      final Set<String> topics,
                      final Pattern pattern) {
            super(name);
            if (topics == null && pattern == null) {
                throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null.");
            }
            if (topics != null && pattern != null) {
                throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null.");
            }

            this.topics = topics;
            this.topicPattern = pattern;
        }

        @Override
        public Set<String> topicSet() {
            return topics;
        }

        @Override
        public Pattern topicPattern() {
            return topicPattern;
        }

        @Override
        public void addPredecessor(final TopologyDescription.Node predecessor) {
            throw new UnsupportedOperationException("Sources don't have predecessors.");
        }

        @Override
        public String toString() {
            final String topicsString = topics == null ? topicPattern.toString() : topics.toString();

            return "Source: " + name + " (topics: " + topicsString + ")\n      --> " + nodeNames(successors);
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final Source source = (Source) o;
            // omit successor to avoid infinite loops
            return name.equals(source.name)
                && Objects.equals(topics, source.topics)
                && (topicPattern == null ?
                        source.topicPattern == null :
                        topicPattern.pattern().equals(source.topicPattern.pattern()));
        }

        @Override
        public int hashCode() {
            // omit successor as it might change and alter the hash code
            return Objects.hash(name, topics, topicPattern);
        }
    }

    public final static class Processor extends AbstractNode implements TopologyDescription.Processor {
        private final Set<String> stores;

        public Processor(final String name,
                         final Set<String> stores) {
            super(name);
            this.stores = stores;
        }

        @Override
        public Set<String> stores() {
            return Collections.unmodifiableSet(stores);
        }

        @Override
        public String toString() {
            return "Processor: " + name + " (stores: " + stores + ")\n      --> "
                + nodeNames(successors) + "\n      <-- " + nodeNames(predecessors);
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final Processor processor = (Processor) o;
            // omit successor to avoid infinite loops
            return name.equals(processor.name)
                && stores.equals(processor.stores)
                && predecessors.equals(processor.predecessors);
        }

        @Override
        public int hashCode() {
            // omit successor as it might change and alter the hash code
            return Objects.hash(name, stores);
        }
    }

    public final static class Sink<K, V> extends AbstractNode implements TopologyDescription.Sink {
        private final TopicNameExtractor<K, V> topicNameExtractor;
        public Sink(final String name,
                    final TopicNameExtractor<K, V> topicNameExtractor) {
            super(name);
            this.topicNameExtractor = topicNameExtractor;
        }

        public Sink(final String name,
                    final String topic) {
            super(name);
            this.topicNameExtractor = new StaticTopicNameExtractor<>(topic);
        }

        @Override
        public String topic() {
            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                return ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
            } else {
                return null;
            }
        }

        @Override
        public TopicNameExtractor<K, V> topicNameExtractor() {
            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                return null;
            } else {
                return topicNameExtractor;
            }
        }

        @Override
        public void addSuccessor(final TopologyDescription.Node successor) {
            throw new UnsupportedOperationException("Sinks don't have successors.");
        }

        @Override
        public String toString() {
            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + nodeNames(predecessors);
            }
            return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n      <-- "
                + nodeNames(predecessors);
        }

        @SuppressWarnings("unchecked")
        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final Sink<K, V> sink = (Sink<K, V>) o;
            return name.equals(sink.name)
                && topicNameExtractor.equals(sink.topicNameExtractor)
                && predecessors.equals(sink.predecessors);
        }

        @Override
        public int hashCode() {
            // omit predecessors as it might change and alter the hash code
            return Objects.hash(name, topicNameExtractor);
        }
    }

    public final static class SubtopologyDescription implements TopologyDescription.Subtopology {
        private final int id;
        private final Set<TopologyDescription.Node> nodes;

        public SubtopologyDescription(final int id, final Set<TopologyDescription.Node> nodes) {
            this.id = id;
            this.nodes = new TreeSet<>(NODE_COMPARATOR);
            this.nodes.addAll(nodes);
        }

        @Override
        public int id() {
            return id;
        }

        @Override
        public Set<TopologyDescription.Node> nodes() {
            return Collections.unmodifiableSet(nodes);
        }

        // visible for testing
        Iterator<TopologyDescription.Node> nodesInOrder() {
            return nodes.iterator();
        }

        @Override
        public String toString() {
            return "Sub-topology: " + id + "\n" + nodesAsString() + "\n";
        }

        private String nodesAsString() {
            final StringBuilder sb = new StringBuilder();
            for (final TopologyDescription.Node node : nodes) {
                sb.append("    ");
                sb.append(node);
                sb.append('\n');
            }
            return sb.toString();
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final SubtopologyDescription that = (SubtopologyDescription) o;
            return id == that.id
                && nodes.equals(that.nodes);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, nodes);
        }
    }

    public static class TopicsInfo {
        public final Set<String> sinkTopics;
        public final Set<String> sourceTopics;
        public final Map<String, InternalTopicConfig> stateChangelogTopics;
        public final Map<String, InternalTopicConfig> repartitionSourceTopics;

        TopicsInfo(final Set<String> sinkTopics,
                   final Set<String> sourceTopics,
                   final Map<String, InternalTopicConfig> repartitionSourceTopics,
                   final Map<String, InternalTopicConfig> stateChangelogTopics) {
            this.sinkTopics = sinkTopics;
            this.sourceTopics = sourceTopics;
            this.stateChangelogTopics = stateChangelogTopics;
            this.repartitionSourceTopics = repartitionSourceTopics;
        }

        /**
         * Returns the config for any changelogs that must be prepared for this topic group, ie excluding any source
         * topics that are reused as a changelog
         */
        public Set<InternalTopicConfig> nonSourceChangelogTopics() {
            final Set<InternalTopicConfig> topicConfigs = new HashSet<>();
            for (final Map.Entry<String, InternalTopicConfig> changelogTopicEntry : stateChangelogTopics.entrySet()) {
                if (!sourceTopics.contains(changelogTopicEntry.getKey())) {
                    topicConfigs.add(changelogTopicEntry.getValue());
                }
            }
            return topicConfigs;
        }

        /**
         * Returns the topic names for any optimized source changelogs
         */
        public Set<String> sourceTopicChangelogs() {
            return sourceTopics.stream().filter(stateChangelogTopics::containsKey).collect(Collectors.toSet());
        }

        @Override
        public boolean equals(final Object o) {
            if (o instanceof TopicsInfo) {
                final TopicsInfo other = (TopicsInfo) o;
                return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
            return (int) (n % 0xFFFFFFFFL);
        }

        @Override
        public String toString() {
            return "TopicsInfo{" +
                "sinkTopics=" + sinkTopics +
                ", sourceTopics=" + sourceTopics +
                ", repartitionSourceTopics=" + repartitionSourceTopics +
                ", stateChangelogTopics=" + stateChangelogTopics +
                '}';
        }
    }

    private static class GlobalStoreComparator implements Comparator<TopologyDescription.GlobalStore>, Serializable {
        @Override
        public int compare(final TopologyDescription.GlobalStore globalStore1,
                           final TopologyDescription.GlobalStore globalStore2) {
            if (globalStore1.equals(globalStore2)) {
                return 0;
            }
            return globalStore1.id() - globalStore2.id();
        }
    }

    private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator();

    private static class SubtopologyComparator implements Comparator<TopologyDescription.Subtopology>, Serializable {
        @Override
        public int compare(final TopologyDescription.Subtopology subtopology1,
                           final TopologyDescription.Subtopology subtopology2) {
            if (subtopology1.equals(subtopology2)) {
                return 0;
            }
            return subtopology1.id() - subtopology2.id();
        }
    }

    private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();

    public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
        private final TreeSet<TopologyDescription.Subtopology> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
        private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR);
        private final String namedTopology;

        public TopologyDescription() {
            this(null);
        }

        public TopologyDescription(final String namedTopology) {
            this.namedTopology = namedTopology;
        }

        public void addSubtopology(final TopologyDescription.Subtopology subtopology) {
            subtopologies.add(subtopology);
        }

        public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) {
            globalStores.add(globalStore);
        }

        @Override
        public Set<TopologyDescription.Subtopology> subtopologies() {
            return Collections.unmodifiableSet(subtopologies);
        }

        @Override
        public Set<TopologyDescription.GlobalStore> globalStores() {
            return Collections.unmodifiableSet(globalStores);
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder();

            if (namedTopology == null) {
                sb.append("Topologies:\n ");
            } else {
                sb.append("Topology: ").append(namedTopology).append(":\n ");
            }
            final TopologyDescription.Subtopology[] sortedSubtopologies =
                subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]);
            final TopologyDescription.GlobalStore[] sortedGlobalStores =
                globalStores.descendingSet().toArray(new GlobalStore[0]);
            int expectedId = 0;
            int subtopologiesIndex = sortedSubtopologies.length - 1;
            int globalStoresIndex = sortedGlobalStores.length - 1;
            while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                sb.append("  ");
                final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
                final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                if (subtopology.id() == expectedId) {
                    sb.append(subtopology);
                    subtopologiesIndex--;
                } else {
                    sb.append(globalStore);
                    globalStoresIndex--;
                }
                expectedId++;
            }
            while (subtopologiesIndex != -1) {
                final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
                sb.append("  ");
                sb.append(subtopology);
                subtopologiesIndex--;
            }
            while (globalStoresIndex != -1) {
                final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                sb.append("  ");
                sb.append(globalStore);
                globalStoresIndex--;
            }
            return sb.toString();
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            final TopologyDescription that = (TopologyDescription) o;
            return subtopologies.equals(that.subtopologies)
                && globalStores.equals(that.globalStores);
        }

        @Override
        public int hashCode() {
            return Objects.hash(subtopologies, globalStores);
        }

    }

    private static String nodeNames(final Set<TopologyDescription.Node> nodes) {
        final StringBuilder sb = new StringBuilder();
        if (!nodes.isEmpty()) {
            for (final TopologyDescription.Node n : nodes) {
                sb.append(n.name());
                sb.append(", ");
            }
            sb.deleteCharAt(sb.length() - 1);
            sb.deleteCharAt(sb.length() - 1);
        } else {
            return "none";
        }
        return sb.toString();
    }

    private Set<String> subscriptionUpdates() {
        return Collections.unmodifiableSet(subscriptionUpdates);
    }

    private boolean hasSubscriptionUpdates() {
        return !subscriptionUpdates.isEmpty();
    }

    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
        if (usesPatternSubscription()) {
            final Set<String> assignedTopics = new HashSet<>();
            for (final TopicPartition topicPartition : partitions) {
                assignedTopics.add(topicPartition.topic());
            }

            final Collection<String> existingTopics = subscriptionUpdates();

            if  (!existingTopics.equals(assignedTopics)) {
                assignedTopics.addAll(existingTopics);
                updateSubscribedTopics(assignedTopics, logPrefix);
            }
        }
    }

    synchronized void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
        if (usesPatternSubscription() && !subscriptionUpdates().equals(topics)) {
            updateSubscribedTopics(topics, logPrefix);
        }
    }

    private void updateSubscribedTopics(final Set<String> topics, final String logPrefix) {
        subscriptionUpdates.clear();
        subscriptionUpdates.addAll(topics);

        log.debug("{}found {} topics possibly matching subscription", logPrefix, topics.size());

        setRegexMatchedTopicsToSourceNodes();
        setRegexMatchedTopicToStateStore();
    }

    /**
     * @return a copy of the string representation of any pattern subscribed source nodes
     */
    public synchronized List<String> allSourcePatternStrings() {
        return nodeToSourcePatterns.values().stream().map(Pattern::pattern).collect(Collectors.toList());
    }

    public boolean hasNamedTopology() {
        return topologyName != null;
    }

    public synchronized Map<String, StateStoreFactory<?>> stateStores() {
        return stateFactories;
    }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractProcessorContext 源码

kafka AbstractReadOnlyDecorator 源码

kafka AbstractReadWriteDecorator 源码

kafka AbstractTask 源码

kafka ActiveTaskCreator 源码

kafka ChangelogReader 源码

kafka ChangelogRecordDeserializationHelper 源码

kafka ChangelogRegister 源码

kafka ChangelogTopics 源码

kafka ClientUtils 源码

0  赞