kafka TopicBasedRemoteLogMetadataManagerConfig 源码
kafka TopicBasedRemoteLogMetadataManagerConfig 代码
文件路径:/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
/**
 * This class defines the configuration of topic based {@link org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} implementation.
 */
public final class TopicBasedRemoteLogMetadataManagerConfig {
    public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata";
    public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor";
    public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions";
    public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms";
    public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms";
    public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms";
    public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";
    public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
    public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L;
    public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
    public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
    public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
    public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 5 * 1000L;
    public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata Topic.";
    public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata Topic.";
    public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC = "Remote log metadata topic log retention in milli seconds." +
            "Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
            "To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
            "tiered storage in the cluster.";
    public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milli seconds to wait for the local consumer to " +
            "receive the published event.";
    public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milli seconds for " +
            " retrying RemoteLogMetadataManager resources initialization again.";
    public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum amount of time in milli seconds " +
            " for retrying RemoteLogMetadataManager resources initialization. When total retry intervals reach this timeout, initialization" +
            " is considered as failed and broker starts shutting down.";
    public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = "remote.log.metadata.common.client.";
    public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer.";
    public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = "remote.log.metadata.consumer.";
    public static final String BROKER_ID = "broker.id";
    public static final String LOG_DIR = "log.dir";
    private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = "__remote_log_metadata_client";
    private static final ConfigDef CONFIG = new ConfigDef();
    static {
        CONFIG.define(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR, atLeast(1), LOW,
                      REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC)
              .define(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, INT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS, atLeast(1), LOW,
                      REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
              .define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS, LOW,
                      REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
              .define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
                      REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
              .define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
                      DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS, atLeast(0), LOW,
                      REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC)
              .define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP, LONG,
                      DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
                      REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC);
    }
    private final String clientIdPrefix;
    private final int metadataTopicPartitionsCount;
    private final String logDir;
    private final long consumeWaitMs;
    private final long metadataTopicRetentionMs;
    private final short metadataTopicReplicationFactor;
    private final long initializationRetryMaxTimeoutMs;
    private final long initializationRetryIntervalMs;
    private Map<String, Object> consumerProps;
    private Map<String, Object> producerProps;
    public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
        Objects.requireNonNull(props, "props can not be null");
        Map<String, Object> parsedConfigs = CONFIG.parse(props);
        logDir = (String) props.get(LOG_DIR);
        if (logDir == null || logDir.isEmpty()) {
            throw new IllegalArgumentException(LOG_DIR + " config must not be null or empty.");
        }
        metadataTopicPartitionsCount = (int) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP);
        metadataTopicReplicationFactor = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP);
        metadataTopicRetentionMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP);
        if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
            throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs);
        }
        consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
        initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
        initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
        clientIdPrefix = REMOTE_LOG_METADATA_CLIENT_PREFIX + "_" + props.get(BROKER_ID);
        initializeProducerConsumerProperties(props);
    }
    private void initializeProducerConsumerProperties(Map<String, ?> configs) {
        Map<String, Object> commonClientConfigs = new HashMap<>();
        Map<String, Object> producerOnlyConfigs = new HashMap<>();
        Map<String, Object> consumerOnlyConfigs = new HashMap<>();
        for (Map.Entry<String, ?> entry : configs.entrySet()) {
            String key = entry.getKey();
            if (key.startsWith(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX)) {
                commonClientConfigs.put(key.substring(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX.length()), entry.getValue());
            } else if (key.startsWith(REMOTE_LOG_METADATA_PRODUCER_PREFIX)) {
                producerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_PRODUCER_PREFIX.length()), entry.getValue());
            } else if (key.startsWith(REMOTE_LOG_METADATA_CONSUMER_PREFIX)) {
                consumerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_CONSUMER_PREFIX.length()), entry.getValue());
            }
        }
        HashMap<String, Object> allProducerConfigs = new HashMap<>(commonClientConfigs);
        allProducerConfigs.putAll(producerOnlyConfigs);
        producerProps = createProducerProps(allProducerConfigs);
        HashMap<String, Object> allConsumerConfigs = new HashMap<>(commonClientConfigs);
        allConsumerConfigs.putAll(consumerOnlyConfigs);
        consumerProps = createConsumerProps(allConsumerConfigs);
    }
    public String remoteLogMetadataTopicName() {
        return REMOTE_LOG_METADATA_TOPIC_NAME;
    }
    public int metadataTopicPartitionsCount() {
        return metadataTopicPartitionsCount;
    }
    public short metadataTopicReplicationFactor() {
        return metadataTopicReplicationFactor;
    }
    public long metadataTopicRetentionMs() {
        return metadataTopicRetentionMs;
    }
    public long consumeWaitMs() {
        return consumeWaitMs;
    }
    public long initializationRetryMaxTimeoutMs() {
        return initializationRetryMaxTimeoutMs;
    }
    public long initializationRetryIntervalMs() {
        return initializationRetryIntervalMs;
    }
    public String logDir() {
        return logDir;
    }
    public Map<String, Object> consumerProperties() {
        return consumerProps;
    }
    public Map<String, Object> producerProperties() {
        return producerProps;
    }
    private Map<String, Object> createConsumerProps(HashMap<String, Object> allConsumerConfigs) {
        Map<String, Object> props = new HashMap<>(allConsumerConfigs);
        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientIdPrefix + "_consumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        return props;
    }
    private Map<String, Object> createProducerProps(HashMap<String, Object> allProducerConfigs) {
        Map<String, Object> props = new HashMap<>(allProducerConfigs);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientIdPrefix + "_producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return Collections.unmodifiableMap(props);
    }
    @Override
    public String toString() {
        return "TopicBasedRemoteLogMetadataManagerConfig{" +
                "clientIdPrefix='" + clientIdPrefix + '\'' +
                ", metadataTopicPartitionsCount=" + metadataTopicPartitionsCount +
                ", consumeWaitMs=" + consumeWaitMs +
                ", metadataTopicRetentionMs=" + metadataTopicRetentionMs +
                ", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
                ", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
                ", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
                ", consumerProps=" + consumerProps +
                ", producerProps=" + producerProps +
                '}';
    }
}
相关信息
相关文章
kafka FileBasedRemoteLogMetadataCache 源码
kafka RemoteLogLeaderEpochState 源码
kafka RemoteLogMetadataCache 源码
kafka RemoteLogMetadataSnapshotFile 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
 - 
                        3、 gate.io
 - 
                        7、 openharmony
 - 
                        9、 golang