kafka StateSerdes 源码

  • 2022-10-20
  • 浏览 (285)

kafka StateSerdes 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.state;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer;

import java.util.Objects;

/**
 * Factory for creating serializers / deserializers for state stores in Kafka Streams.
 *
 * @param <K> key type of serde
 * @param <V> value type of serde
 */
public final class StateSerdes<K, V> {

    public static final int TIMESTAMP_SIZE = 8;
    public static final int BOOLEAN_SIZE = 1;

    /**
     * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
     *
     * @param topic      the topic name
     * @param keyClass   the class of the key type
     * @param valueClass the class of the value type
     * @param <K>        the key type
     * @param <V>        the value type
     * @return a new instance of {@link StateSerdes}
     */
    public static <K, V> StateSerdes<K, V> withBuiltinTypes(
        final String topic,
        final Class<K> keyClass,
        final Class<V> valueClass) {
        return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
    }

    private final String topic;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;

    /**
     * Create a context for serialization using the specified serializers and deserializers which
     * <em>must</em> match the key and value types used as parameters for this object; the state changelog topic
     * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
     * need to provide the topic name any more.
     *
     * @param topic         the topic name
     * @param keySerde      the serde for keys; cannot be null
     * @param valueSerde    the serde for values; cannot be null
     * @throws IllegalArgumentException if key or value serde is null
     */
    public StateSerdes(final String topic,
                       final Serde<K> keySerde,
                       final Serde<V> valueSerde) {
        Objects.requireNonNull(topic, "topic cannot be null");
        Objects.requireNonNull(keySerde, "key serde cannot be null");
        Objects.requireNonNull(valueSerde, "value serde cannot be null");

        this.topic = topic;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    /**
     * Return the key serde.
     *
     * @return the key serde
     */
    public Serde<K> keySerde() {
        return keySerde;
    }

    /**
     * Return the value serde.
     *
     * @return the value serde
     */
    public Serde<V> valueSerde() {
        return valueSerde;
    }

    /**
     * Return the key deserializer.
     *
     * @return the key deserializer
     */
    public Deserializer<K> keyDeserializer() {
        return keySerde.deserializer();
    }

    /**
     * Return the key serializer.
     *
     * @return the key serializer
     */
    public Serializer<K> keySerializer() {
        return keySerde.serializer();
    }

    /**
     * Return the value deserializer.
     *
     * @return the value deserializer
     */
    public Deserializer<V> valueDeserializer() {
        return valueSerde.deserializer();
    }

    /**
     * Return the value serializer.
     *
     * @return the value serializer
     */
    public Serializer<V> valueSerializer() {
        return valueSerde.serializer();
    }

    /**
     * Return the topic.
     *
     * @return the topic
     */
    public String topic() {
        return topic;
    }

    /**
     * Deserialize the key from raw bytes.
     *
     * @param rawKey  the key as raw bytes
     * @return        the key as typed object
     */
    public K keyFrom(final byte[] rawKey) {
        return keySerde.deserializer().deserialize(topic, rawKey);
    }

    /**
     * Deserialize the value from raw bytes.
     *
     * @param rawValue  the value as raw bytes
     * @return          the value as typed object
     */
    public V valueFrom(final byte[] rawValue) {
        return valueSerde.deserializer().deserialize(topic, rawValue);
    }

    /**
     * Serialize the given key.
     *
     * @param key  the key to be serialized
     * @return     the serialized key
     */
    public byte[] rawKey(final K key) {
        try {
            return keySerde.serializer().serialize(topic, key);
        } catch (final ClassCastException e) {
            final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
            throw new StreamsException(
                    String.format("A serializer (%s) is not compatible to the actual key type " +
                                    "(key type: %s). Change the default Serdes in StreamConfig or " +
                                    "provide correct Serdes via method parameters.",
                            keySerializer().getClass().getName(),
                            keyClass),
                    e);
        }
    }

    /**
     * Serialize the given value.
     *
     * @param value  the value to be serialized
     * @return       the serialized value
     */
    public byte[] rawValue(final V value) {
        try {
            return valueSerde.serializer().serialize(topic, value);
        } catch (final ClassCastException e) {
            final String valueClass;
            final Class<? extends Serializer> serializerClass;
            if (valueSerializer() instanceof ValueAndTimestampSerializer) {
                serializerClass = ((ValueAndTimestampSerializer) valueSerializer()).valueSerializer.getClass();
                valueClass = value == null ? "unknown because value is null" : ((ValueAndTimestamp) value).value().getClass().getName();
            } else {
                serializerClass = valueSerializer().getClass();
                valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
            }
            throw new StreamsException(
                    String.format("A serializer (%s) is not compatible to the actual value type " +
                                    "(value type: %s). Change the default Serdes in StreamConfig or " +
                                    "provide correct Serdes via method parameters.",
                            serializerClass.getName(),
                            valueClass),
                    e);
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka HostInfo 源码

kafka KeyValueBytesStoreSupplier 源码

kafka KeyValueIterator 源码

kafka KeyValueStore 源码

kafka QueryableStoreType 源码

kafka QueryableStoreTypes 源码

kafka ReadOnlyKeyValueStore 源码

kafka ReadOnlySessionStore 源码

kafka ReadOnlyWindowStore 源码

kafka RocksDBConfigSetter 源码

0  赞