kafka KTable 源码
kafka KTable 代码
文件路径:/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.function.Function;
/**
* {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
* <p>
* A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka topic} that is
* consumed message by message or the result of a {@code KTable} transformation.
* An aggregation of a {@link KStream} also yields a {@code KTable}.
* <p>
* A {@code KTable} can be transformed record by record, joined with another {@code KTable} or {@link KStream}, or
* can be re-partitioned and aggregated into a new {@code KTable}.
* <p>
* Some {@code KTable}s have an internal state (a {@link ReadOnlyKeyValueStore}) and are therefore queryable via the
* interactive queries API.
* For example:
* <pre>{@code
* final KTable table = ...
* ...
* final KafkaStreams streams = ...;
* streams.start()
* ...
* final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
* ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* view.get(key);
*}</pre>
*<p>
* Records from the source topic that have null keys are dropped.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
* @see KStream
* @see KGroupedTable
* @see GlobalKTable
* @see StreamsBuilder#table(String)
*/
public interface KTable<K, V> {
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate, with default serializers, deserializers, and state store.
* All records that do not satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filter} for a <i>changelog stream</i> works differently than {@link KStream#filter(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate, with default serializers, deserializers, and state store.
* All records that do not satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filter} for a <i>changelog stream</i> works differently than {@link KStream#filter(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* All records that do not satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filter} for a <i>changelog stream</i> works differently than {@link KStream#filter(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate, Materialized)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* All records that do not satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filter} for a <i>changelog stream</i> works differently than {@link KStream#filter(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record
* is forwarded.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate, Materialized)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate, with default serializers, deserializers, and state store.
* All records that <em>do</em> satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filterNot} for a <i>changelog stream</i> works differently than {@link KStream#filterNot(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate, with default serializers, deserializers, and state store.
* All records that <em>do</em> satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filterNot} for a <i>changelog stream</i> works differently than {@link KStream#filterNot(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* All records that <em>do</em> satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filterNot} for a <i>changelog stream</i> works differently than {@link KStream#filterNot(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* @param predicate a filter {@link Predicate} that is applied to each record
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate, Materialized)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* All records that <em>do</em> satisfy the predicate are dropped.
* For each {@code KTable} update, the filter is evaluated based on the current update
* record and then an update record is produced for the result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filterNot} for a <i>changelog stream</i> works differently than {@link KStream#filterNot(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
* K key = "some-word";
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate, Materialized)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and
* computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length);
* }</pre>
* <p>
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and
* computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));
* }</pre>
* <p>
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store.
* For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
* record and computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of value and key strings.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable =
* inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapperWithKey)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store.
* For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
* record and computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of value and key strings.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable =
* inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapperWithKey)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde},
* and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized}
* instance.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and
* computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
* Integer apply(String value) {
* return value.split(" ").length;
* }
* });
* }</pre>
* <p>
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde},
* and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized}
* instance.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and
* computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
* Integer apply(String value) {
* return value.split(" ").length;
* }
* });
* }</pre>
* <p>
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde},
* and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized}
* instance.
* For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
* record and computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of value and key strings.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* <p>
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde},
* and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized}
* instance.
* For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
* record and computes a new value for it, resulting in an updated record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of value and key strings.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* <p>
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works differently than {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Convert this changelog stream to a {@link KStream}.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}).
*
* @return a {@link KStream} that contains the same records as this {@code KTable}
*/
KStream<K, V> toStream();
/**
* Convert this changelog stream to a {@link KStream}.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}).
*
* @param named a {@link Named} config used to name the processor in the topology
*
* @return a {@link KStream} that contains the same records as this {@code KTable}
*/
KStream<K, V> toStream(final Named named);
/**
* Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key.
* <p>
* For example, you can compute the new key as the length of the value string.
* <pre>{@code
* KTable<String, String> table = builder.table("topic");
* KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
* Integer apply(String key, String value) {
* return value.length();
* }
* });
* }</pre>
* Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@link KStream}.
* <p>
* This operation is equivalent to calling
* {@code table.}{@link #toStream() toStream}{@code ().}{@link KStream#selectKey(KeyValueMapper) selectKey(KeyValueMapper)}.
* <p>
* Note that {@link #toStream()} is a logical operation and only changes the "interpretation" of the stream, i.e.,
* each record of this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}).
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param <KR> the new key type of the result stream
* @return a {@link KStream} that contains the same records as this {@code KTable}
*/
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
/**
* Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key.
* <p>
* For example, you can compute the new key as the length of the value string.
* <pre>{@code
* KTable<String, String> table = builder.table("topic");
* KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
* Integer apply(String key, String value) {
* return value.length();
* }
* });
* }</pre>
* Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@link KStream}.
* <p>
* This operation is equivalent to calling
* {@code table.}{@link #toStream() toStream}{@code ().}{@link KStream#selectKey(KeyValueMapper) selectKey(KeyValueMapper)}.
* <p>
* Note that {@link #toStream()} is a logical operation and only changes the "interpretation" of the stream, i.e.,
* each record of this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}).
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the new key type of the result stream
* @return a {@link KStream} that contains the same records as this {@code KTable}
*/
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
final Named named);
/**
* Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
*
* This controls what updates downstream table and stream operations will receive.
*
* @param suppressed Configuration object determining what, if any, updates to suppress
* @return A new KTable with the desired suppression characteristics.
*/
KTable<K, V> suppress(final Suppressed<? super K> suppressed);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with default serializers, deserializers, and state store.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing access to additional state-stores,
* and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* <p>
* If the downstream topology uses aggregation functions, (e.g. {@link KGroupedTable#reduce}, {@link KGroupedTable#aggregate}, etc),
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results.
* In contrast, if the resulting KTable is materialized, (cf. {@link #transformValues(ValueTransformerWithKeySupplier, Materialized, String...)}),
* such concerns are handled for you.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with default serializers, deserializers, and state store.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing access to additional state-stores,
* and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* <p>
* If the downstream topology uses aggregation functions, (e.g. {@link KGroupedTable#reduce}, {@link KGroupedTable#aggregate}, etc),
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results.
* In contrast, if the resulting KTable is materialized, (cf. {@link #transformValues(ValueTransformerWithKeySupplier, Materialized, String...)}),
* such concerns are handled for you.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Named named,
final String... stateStoreNames);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing stateful, rather than stateless,
* record-by-record operation, access to additional state-stores, and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* The resulting {@code KTable} is materialized into another state store (additional to the provided state store names)
* as specified by the user via {@link Materialized} parameter, and is queryable through its given name.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(
* new ValueTransformerWithKeySupplier() { ... },
* Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
* .withKeySerde(Serdes.String())
* .withValueSerde(Serdes.String()),
* "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* Cannot be {@code null}
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with the {@link Serde key serde}, {@link Serde value serde}, and the underlying
* {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing stateful, rather than stateless,
* record-by-record operation, access to additional state-stores, and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* The resulting {@code KTable} is materialized into another state store (additional to the provided state store names)
* as specified by the user via {@link Materialized} parameter, and is queryable through its given name.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(
* new ValueTransformerWithKeySupplier() { ... },
* Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
* .withKeySerde(Serdes.String())
* .withValueSerde(Serdes.String()),
* "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* Cannot be {@code null}
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames);
/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
* and deserializers.
* Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
* provided {@link KeyValueMapper}.
* Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedTable}).
* The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type).
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
* records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
* on the new key.
* <p>
* If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)}
* instead.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
* @param <KR> the key type of the result {@link KGroupedTable}
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Grouped}.
* Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
* provided {@link KeyValueMapper}.
* Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedTable}).
* The {@link KeyValueMapper} selects a new key and value (where both could the same type or a new type).
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
*
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
* records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
* on the new key.
*
* @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
* @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
* and the name for a repartition topic if repartitioning is required.
* @param <KR> the key type of the result {@link KGroupedTable}
* @param <VR> the value type of the result {@link KGroupedTable}
* @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
*/
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
final Grouped<KR, VR> grouped);
/**
* Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
* with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
* directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:C></td>
* <td><K1:C></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner)
* @see #outerJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
/**
* Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
* with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
* directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:C></td>
* <td><K1:C></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner)
* @see #outerJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
* with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
* directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:C></td>
* <td><K1:C></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
*/
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
* with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
* directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:C></td>
* <td><K1:C></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:C></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
*/
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed left equi join, with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
* an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
* right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
* null} to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
* forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
* deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:null></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* left {@code KTable}
* @see #join(KTable, ValueJoiner)
* @see #outerJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed left equi join, with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
* an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
* right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
* null} to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
* forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
* deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:null></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* left {@code KTable}
* @see #join(KTable, ValueJoiner)
* @see #outerJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed left equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
* an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
* right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
* null} to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
* forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
* deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:null></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* left {@code KTable}
* @see #join(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
*/
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed left equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce
* an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record of left {@code KTable} that does not find a corresponding record in the
* right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
* null} to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
* forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be
* deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:null></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* left {@code KTable}
* @see #join(KTable, ValueJoiner, Materialized)
* @see #outerJoin(KTable, ValueJoiner, Materialized)
*/
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed outer equi join, with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
* all records from both input {@code KTable}s will produce an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record that does not find a corresponding record in the corresponding other
* {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
* corresponding other value to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
* to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(null,b)></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* both {@code KTable}s
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed outer equi join, with default serializers, deserializers, and state store.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
* all records from both input {@code KTable}s will produce an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record that does not find a corresponding record in the corresponding other
* {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
* corresponding other value to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
* to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(null,b)></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* both {@code KTable}s
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed outer equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
* all records from both input {@code KTable}s will produce an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record that does not find a corresponding record in the corresponding other
* {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
* corresponding other value to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
* to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(null,b)></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* both {@code KTable}s
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using
* non-windowed outer equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde},
* {@link Serde the result table's value serde}, and {@link KeyValueStore state store}.
* The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
* all records from both input {@code KTable}s will produce an output record (cf. below).
* The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result
* of the join.
* <p>
* The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a
* matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}.
* This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
* {@code KTable} the result gets updated.
* <p>
* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* Additionally, for each record that does not find a corresponding record in the corresponding other
* {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
* corresponding other value to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* <p>
* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
* Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
* to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted).
* <p>
* Input records with {@code null} key will be dropped and no join computation is performed.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>thisKTable</th>
* <th>thisState</th>
* <th>otherKTable</th>
* <th>otherState</th>
* <th>result updated record</th>
* </tr>
* <tr>
* <td><K1:A></td>
* <td><K1:A></td>
* <td></td>
* <td></td>
* <td><K1:ValueJoiner(A,null)></td>
* </tr>
* <tr>
* <td></td>
* <td><K1:A></td>
* <td><K1:b></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(A,b)></td>
* </tr>
* <tr>
* <td><K1:null></td>
* <td></td>
* <td></td>
* <td><K1:b></td>
* <td><K1:ValueJoiner(null,b)></td>
* </tr>
* <tr>
* <td></td>
* <td></td>
* <td><K1:null></td>
* <td></td>
* <td><K1:null></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized.
* Cannot be {@code null}
* @param <VO> the value type of the other {@code KTable}
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* both {@code KTable}s
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(KTable, ValueJoiner)
*/
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
*
* @return the underlying state store name, or {@code null} if this {@code KTable} cannot be queried.
*/
String queryableStoreName();
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦