kafka HostInfo 源码

  • 2022-10-20
  • 浏览 (262)

kafka HostInfo 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.state;

import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;

/**
 * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
 * Instances of this class can be obtained by calling one of:
 *  {@link KafkaStreams#metadataForAllStreamsClients()}
 *  {@link KafkaStreams#streamsMetadataForStore(String)}
 *
 *  The HostInfo is constructed during Partition Assignment
 *  see {@link StreamsPartitionAssignor}
 *  It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
 *
 *  If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above
 *  config.
 */
public class HostInfo {
    private final String host;
    private final int port;

    public HostInfo(final String host,
                    final int port) {
        this.host = host;
        this.port = port;
    }

    /**
     * @throws ConfigException if the host or port cannot be parsed from the given endpoint string
     * @return a new HostInfo or null if endPoint is null or has no characters
     */
    public static HostInfo buildFromEndpoint(final String endPoint) {
        if (Utils.isBlank(endPoint)) {
            return null;
        }

        final String host = getHost(endPoint);
        final Integer port = getPort(endPoint);

        if (host == null || port == null) {
            throw new ConfigException(
                String.format("Error parsing host address %s. Expected format host:port.", endPoint)
            );
        }
        return new HostInfo(host, port);
    }

    /**
     * @return a sentinel for cases where the host metadata is currently unavailable, eg during rebalance operations.
     */
    public static HostInfo unavailable() {
        return new HostInfo("unavailable", -1);
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        final HostInfo hostInfo = (HostInfo) o;
        return port == hostInfo.port && host.equals(hostInfo.host);
    }

    @Override
    public int hashCode() {
        int result = host.hashCode();
        result = 31 * result + port;
        return result;
    }

    public String host() {
        return host;
    }

    public int port() {
        return port;
    }

    @Override
    public String toString() {
        return "HostInfo{" +
                "host=\'" + host + '\'' +
                ", port=" + port +
                '}';
    }
}

相关信息

kafka 源码目录

相关文章

kafka KeyValueBytesStoreSupplier 源码

kafka KeyValueIterator 源码

kafka KeyValueStore 源码

kafka QueryableStoreType 源码

kafka QueryableStoreTypes 源码

kafka ReadOnlyKeyValueStore 源码

kafka ReadOnlySessionStore 源码

kafka ReadOnlyWindowStore 源码

kafka RocksDBConfigSetter 源码

kafka SessionBytesStoreSupplier 源码

0  赞