kafka LogManagerBuilder 源码

  • 2022-10-20
  • 浏览 (193)

kafka LogManagerBuilder 代码

文件路径:/core/src/main/java/kafka/server/builders/LogManagerBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.server.builders;

import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.log.ProducerStateManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import scala.collection.JavaConverters;

import java.io.File;
import java.util.Collections;
import java.util.List;


public class LogManagerBuilder {
    private List<File> logDirs = null;
    private List<File> initialOfflineDirs = Collections.emptyList();
    private ConfigRepository configRepository = null;
    private LogConfig initialDefaultConfig = null;
    private CleanerConfig cleanerConfig = null;
    private int recoveryThreadsPerDataDir = 1;
    private long flushCheckMs = 1000L;
    private long flushRecoveryOffsetCheckpointMs = 10000L;
    private long flushStartOffsetCheckpointMs = 10000L;
    private long retentionCheckMs = 1000L;
    private int maxTransactionTimeoutMs = 15 * 60 * 1000;
    private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000);
    private int producerIdExpirationCheckIntervalMs = 600000;
    private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
    private Scheduler scheduler = null;
    private BrokerTopicStats brokerTopicStats = null;
    private LogDirFailureChannel logDirFailureChannel = null;
    private Time time = Time.SYSTEM;
    private boolean keepPartitionMetadataFile = true;

    public LogManagerBuilder setLogDirs(List<File> logDirs) {
        this.logDirs = logDirs;
        return this;
    }

    public LogManagerBuilder setInitialOfflineDirs(List<File> initialOfflineDirs) {
        this.initialOfflineDirs = initialOfflineDirs;
        return this;
    }

    public LogManagerBuilder setConfigRepository(ConfigRepository configRepository) {
        this.configRepository = configRepository;
        return this;
    }

    public LogManagerBuilder setInitialDefaultConfig(LogConfig initialDefaultConfig) {
        this.initialDefaultConfig = initialDefaultConfig;
        return this;
    }

    public LogManagerBuilder setCleanerConfig(CleanerConfig cleanerConfig) {
        this.cleanerConfig = cleanerConfig;
        return this;
    }

    public LogManagerBuilder setRecoveryThreadsPerDataDir(int recoveryThreadsPerDataDir) {
        this.recoveryThreadsPerDataDir = recoveryThreadsPerDataDir;
        return this;
    }

    public LogManagerBuilder setFlushCheckMs(long flushCheckMs) {
        this.flushCheckMs = flushCheckMs;
        return this;
    }

    public LogManagerBuilder setFlushRecoveryOffsetCheckpointMs(long flushRecoveryOffsetCheckpointMs) {
        this.flushRecoveryOffsetCheckpointMs = flushRecoveryOffsetCheckpointMs;
        return this;
    }

    public LogManagerBuilder setFlushStartOffsetCheckpointMs(long flushStartOffsetCheckpointMs) {
        this.flushStartOffsetCheckpointMs = flushStartOffsetCheckpointMs;
        return this;
    }

    public LogManagerBuilder setRetentionCheckMs(long retentionCheckMs) {
        this.retentionCheckMs = retentionCheckMs;
        return this;
    }

    public LogManagerBuilder setMaxTransactionTimeoutMs(int maxTransactionTimeoutMs) {
        this.maxTransactionTimeoutMs = maxTransactionTimeoutMs;
        return this;
    }

    public LogManagerBuilder setMaxProducerIdExpirationMs(int maxProducerIdExpirationMs) {
        this.producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs);
        return this;
    }

    public LogManagerBuilder setInterBrokerProtocolVersion(MetadataVersion interBrokerProtocolVersion) {
        this.interBrokerProtocolVersion = interBrokerProtocolVersion;
        return this;
    }

    public LogManagerBuilder setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        return this;
    }

    public LogManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
        this.brokerTopicStats = brokerTopicStats;
        return this;
    }

    public LogManagerBuilder setLogDirFailureChannel(LogDirFailureChannel logDirFailureChannel) {
        this.logDirFailureChannel = logDirFailureChannel;
        return this;
    }

    public LogManagerBuilder setTime(Time time) {
        this.time = time;
        return this;
    }

    public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetadataFile) {
        this.keepPartitionMetadataFile = keepPartitionMetadataFile;
        return this;
    }

    public LogManager build() {
        if (logDirs == null) throw new RuntimeException("you must set logDirs");
        if (configRepository == null) throw new RuntimeException("you must set configRepository");
        if (initialDefaultConfig == null) throw new RuntimeException("you must set initialDefaultConfig");
        if (cleanerConfig == null) throw new RuntimeException("you must set cleanerConfig");
        if (scheduler == null) throw new RuntimeException("you must set scheduler");
        if (brokerTopicStats == null) throw new RuntimeException("you must set brokerTopicStats");
        if (logDirFailureChannel == null) throw new RuntimeException("you must set logDirFailureChannel");

        return new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
                              JavaConverters.asScalaIteratorConverter(initialOfflineDirs.iterator()).asScala().toSeq(),
                              configRepository,
                              initialDefaultConfig,
                              cleanerConfig,
                              recoveryThreadsPerDataDir,
                              flushCheckMs,
                              flushRecoveryOffsetCheckpointMs,
                              flushStartOffsetCheckpointMs,
                              retentionCheckMs,
                              maxTransactionTimeoutMs,
                              producerStateManagerConfig,
                              producerIdExpirationCheckIntervalMs,
                              interBrokerProtocolVersion,
                              scheduler,
                              brokerTopicStats,
                              logDirFailureChannel,
                              time,
                              keepPartitionMetadataFile);
    }
}

相关信息

kafka 源码目录

相关文章

kafka KafkaApisBuilder 源码

kafka ReplicaManagerBuilder 源码

0  赞