kafka SchemaBuilder 源码
kafka SchemaBuilder 代码
文件路径:/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.SchemaBuilderException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
 * <p>
 *     SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the
 *     properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types
 *     are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline.
 * </p>
 * <p>
 *     Here is an example of building a struct schema:
 *     <pre>
 *     Schema dateSchema = SchemaBuilder.struct()
 *         .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.")
 *         .field("month", Schema.STRING_SCHEMA)
 *         .field("day", Schema.INT8_SCHEMA)
 *         .field("year", Schema.INT16_SCHEMA)
 *         .build();
 *     </pre>
 * </p>
 * <p>
 *     Here is an example of using a second SchemaBuilder to construct complex, nested types:
 *     <pre>
 *     Schema userListSchema = SchemaBuilder.array(
 *         SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
 *     ).build();
 *     </pre>
 * </p>
 */
public class SchemaBuilder implements Schema {
    private static final String TYPE_FIELD = "type";
    private static final String OPTIONAL_FIELD = "optional";
    private static final String DEFAULT_FIELD = "default";
    private static final String NAME_FIELD = "name";
    private static final String VERSION_FIELD = "version";
    private static final String DOC_FIELD = "doc";
    private final Type type;
    private Boolean optional = null;
    private Object defaultValue = null;
    private Map<String, Field> fields = null;
    private Schema keySchema = null;
    private Schema valueSchema = null;
    private String name;
    private Integer version;
    // Optional human readable documentation describing this schema.
    private String doc;
    // Additional parameters for logical types.
    private Map<String, String> parameters;
    public SchemaBuilder(Type type) {
        if (null == type)
            throw new SchemaBuilderException("type cannot be null");
        this.type = type;
        if (type == Type.STRUCT) {
            fields = new LinkedHashMap<>();
        }
    }
    // Common/metadata fields
    @Override
    public boolean isOptional() {
        return optional == null ? false : optional;
    }
    /**
     * Set this schema as optional.
     * @return the SchemaBuilder
     */
    public SchemaBuilder optional() {
        checkCanSet(OPTIONAL_FIELD, optional, true);
        optional = true;
        return this;
    }
    /**
     * Set this schema as required. This is the default, but this method can be used to make this choice explicit.
     * @return the SchemaBuilder
     */
    public SchemaBuilder required() {
        checkCanSet(OPTIONAL_FIELD, optional, false);
        optional = false;
        return this;
    }
    @Override
    public Object defaultValue() {
        return defaultValue;
    }
    /**
     * Set the default value for this schema. The value is validated against the schema type, throwing a
     * {@link SchemaBuilderException} if it does not match.
     * @param value the default value
     * @return the SchemaBuilder
     */
    public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }
    @Override
    public String name() {
        return name;
    }
    /**
     * Set the name of this schema.
     * @param name the schema name
     * @return the SchemaBuilder
     */
    public SchemaBuilder name(String name) {
        checkCanSet(NAME_FIELD, this.name, name);
        this.name = name;
        return this;
    }
    @Override
    public Integer version() {
        return version;
    }
    /**
     * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is
     * newer and which is older by their ordering.
     * @param version the schema version
     * @return the SchemaBuilder
     */
    public SchemaBuilder version(Integer version) {
        checkCanSet(VERSION_FIELD, this.version, version);
        this.version = version;
        return this;
    }
    @Override
    public String doc() {
        return doc;
    }
    /**
     * Set the documentation for this schema.
     * @param doc the documentation
     * @return the SchemaBuilder
     */
    public SchemaBuilder doc(String doc) {
        checkCanSet(DOC_FIELD, this.doc, doc);
        this.doc = doc;
        return this;
    }
    @Override
    public Map<String, String> parameters() {
        return parameters == null ? null : Collections.unmodifiableMap(parameters);
    }
    /**
     * Set a schema parameter.
     * @param propertyName name of the schema property to define
     * @param propertyValue value of the schema property to define, as a String
     * @return the SchemaBuilder
     */
    public SchemaBuilder parameter(String propertyName, String propertyValue) {
        // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types
        // can print their properties in a consistent order.
        if (parameters == null)
            parameters = new LinkedHashMap<>();
        parameters.put(propertyName, propertyValue);
        return this;
    }
    /**
     * Set schema parameters. This operation is additive; it does not remove existing parameters that do not appear in
     * the set of properties pass to this method.
     * @param props Map of properties to set
     * @return the SchemaBuilder
     */
    public SchemaBuilder parameters(Map<String, String> props) {
        // Avoid creating an empty set of properties so we never have an empty map
        if (props.isEmpty())
            return this;
        if (parameters == null)
            parameters = new LinkedHashMap<>();
        parameters.putAll(props);
        return this;
    }
    @Override
    public Type type() {
        return type;
    }
    /**
     * Create a SchemaBuilder for the specified type.
     *
     * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form
     * can be useful when generating schemas dynamically.
     *
     * @param type the schema type
     * @return a new SchemaBuilder
     */
    public static SchemaBuilder type(Type type) {
        return new SchemaBuilder(type);
    }
    // Primitive types
    /**
     * @return a new {@link Schema.Type#INT8} SchemaBuilder
     */
    public static SchemaBuilder int8() {
        return new SchemaBuilder(Type.INT8);
    }
    /**
     * @return a new {@link Schema.Type#INT16} SchemaBuilder
     */
    public static SchemaBuilder int16() {
        return new SchemaBuilder(Type.INT16);
    }
    /**
     * @return a new {@link Schema.Type#INT32} SchemaBuilder
     */
    public static SchemaBuilder int32() {
        return new SchemaBuilder(Type.INT32);
    }
    /**
     * @return a new {@link Schema.Type#INT64} SchemaBuilder
     */
    public static SchemaBuilder int64() {
        return new SchemaBuilder(Type.INT64);
    }
    /**
     * @return a new {@link Schema.Type#FLOAT32} SchemaBuilder
     */
    public static SchemaBuilder float32() {
        return new SchemaBuilder(Type.FLOAT32);
    }
    /**
     * @return a new {@link Schema.Type#FLOAT64} SchemaBuilder
     */
    public static SchemaBuilder float64() {
        return new SchemaBuilder(Type.FLOAT64);
    }
    /**
     * @return a new {@link Schema.Type#BOOLEAN} SchemaBuilder
     */
    public static SchemaBuilder bool() {
        return new SchemaBuilder(Type.BOOLEAN);
    }
    /**
     * @return a new {@link Schema.Type#STRING} SchemaBuilder
     */
    public static SchemaBuilder string() {
        return new SchemaBuilder(Type.STRING);
    }
    /**
     * @return a new {@link Schema.Type#BYTES} SchemaBuilder
     */
    public static SchemaBuilder bytes() {
        return new SchemaBuilder(Type.BYTES);
    }
    // Structs
    /**
     * @return a new {@link Schema.Type#STRUCT} SchemaBuilder
     */
    public static SchemaBuilder struct() {
        return new SchemaBuilder(Type.STRUCT);
    }
    /**
     * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema.
     * @param fieldName the name of the field to add
     * @param fieldSchema the Schema for the field's value
     * @return the SchemaBuilder
     */
    public SchemaBuilder field(String fieldName, Schema fieldSchema) {
        if (type != Type.STRUCT)
            throw new SchemaBuilderException("Cannot create fields on type " + type);
        if (null == fieldName || fieldName.isEmpty())
            throw new SchemaBuilderException("fieldName cannot be null.");
        if (null == fieldSchema)
            throw new SchemaBuilderException("fieldSchema for field " + fieldName + " cannot be null.");
        int fieldIndex = fields.size();
        if (fields.containsKey(fieldName))
            throw new SchemaBuilderException("Cannot create field because of field name duplication " + fieldName);
        fields.put(fieldName, new Field(fieldName, fieldIndex, fieldSchema));
        return this;
    }
    /**
     * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
     * @return the list of fields for this Schema
     */
    @Override
    public List<Field> fields() {
        if (type != Type.STRUCT)
            throw new DataException("Cannot list fields on non-struct type");
        return new ArrayList<>(fields.values());
    }
    @Override
    public Field field(String fieldName) {
        if (type != Type.STRUCT)
            throw new DataException("Cannot look up fields on non-struct type");
        return fields.get(fieldName);
    }
    // Maps & Arrays
    /**
     * @param valueSchema the schema for elements of the array
     * @return a new {@link Schema.Type#ARRAY} SchemaBuilder
     */
    public static SchemaBuilder array(Schema valueSchema) {
        if (null == valueSchema)
            throw new SchemaBuilderException("valueSchema cannot be null.");
        SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
        builder.valueSchema = valueSchema;
        return builder;
    }
    /**
     * @param keySchema the schema for keys in the map
     * @param valueSchema the schema for values in the map
     * @return a new {@link Schema.Type#MAP} SchemaBuilder
     */
    public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
        if (null == keySchema)
            throw new SchemaBuilderException("keySchema cannot be null.");
        if (null == valueSchema)
            throw new SchemaBuilderException("valueSchema cannot be null.");
        SchemaBuilder builder = new SchemaBuilder(Type.MAP);
        builder.keySchema = keySchema;
        builder.valueSchema = valueSchema;
        return builder;
    }
    static SchemaBuilder arrayOfNull() {
        return new SchemaBuilder(Type.ARRAY);
    }
    static SchemaBuilder mapOfNull() {
        return new SchemaBuilder(Type.MAP);
    }
    static SchemaBuilder mapWithNullKeys(Schema valueSchema) {
        SchemaBuilder result = new SchemaBuilder(Type.MAP);
        result.valueSchema = valueSchema;
        return result;
    }
    static SchemaBuilder mapWithNullValues(Schema keySchema) {
        SchemaBuilder result = new SchemaBuilder(Type.MAP);
        result.keySchema = keySchema;
        return result;
    }
    @Override
    public Schema keySchema() {
        return keySchema;
    }
    @Override
    public Schema valueSchema() {
        return valueSchema;
    }
    /**
     * Build the Schema using the current settings
     * @return the {@link Schema}
     */
    public Schema build() {
        return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc,
                parameters == null ? null : Collections.unmodifiableMap(parameters),
                fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema);
    }
    /**
     * Return a concrete instance of the {@link Schema} specified by this builder
     * @return the {@link Schema}
     */
    @Override
    public Schema schema() {
        return build();
    }
    private static void checkCanSet(String fieldName, Object fieldVal, Object val) {
        if (fieldVal != null && fieldVal != val)
            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set.");
    }
    private static void checkNotNull(String fieldName, Object val, String fieldToSet) {
        if (val == null)
            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
    }
}
相关信息
相关文章
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang