kafka SaslClientAuthenticator 源码
kafka SaslClientAuthenticator 代码
文件路径:/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.ReauthenticationContext;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
public class SaslClientAuthenticator implements Authenticator {
    /**
     * The internal state transitions for initial authentication of a channel are
     * declared in order, starting with {@link #SEND_APIVERSIONS_REQUEST} and ending
     * in either {@link #COMPLETE} or {@link #FAILED}.
     * <p>
     * Re-authentication of a channel starts with the state
     * {@link #REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE} and then flows to
     * {@link #REAUTH_SEND_HANDSHAKE_REQUEST} followed by
     * {@link #REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE} and then
     * {@link #REAUTH_INITIAL}; after that the flow joins the authentication flow
     * at the {@link #INTERMEDIATE} state and ends at either {@link #COMPLETE} or
     * {@link #FAILED}.
     */
    public enum SaslState {
        SEND_APIVERSIONS_REQUEST,                   // Initial state for authentication: client sends ApiVersionsRequest in this state when authenticating
        RECEIVE_APIVERSIONS_RESPONSE,               // Awaiting ApiVersionsResponse from server
        SEND_HANDSHAKE_REQUEST,                     // Received ApiVersionsResponse, send SaslHandshake request
        RECEIVE_HANDSHAKE_RESPONSE,                 // Awaiting SaslHandshake response from server when authenticating
        INITIAL,                                    // Initial authentication state starting SASL token exchange for configured mechanism, send first token
        INTERMEDIATE,                               // Intermediate state during SASL token exchange, process challenges and send responses
        CLIENT_COMPLETE,                            // Sent response to last challenge. If using SaslAuthenticate, wait for authentication status from server, else COMPLETE
        COMPLETE,                                   // Authentication sequence complete. If using SaslAuthenticate, this state implies successful authentication.
        FAILED,                                     // Failed authentication due to an error at some stage
        REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE,   // Initial state for re-authentication: process ApiVersionsResponse from original authentication
        REAUTH_SEND_HANDSHAKE_REQUEST,              // Processed original ApiVersionsResponse, send SaslHandshake request as part of re-authentication
        REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE, // Awaiting SaslHandshake response from server when re-authenticating, and may receive other, in-flight responses sent prior to start of re-authentication as well
        REAUTH_INITIAL,                             // Initial re-authentication state starting SASL token exchange for configured mechanism, send first token
    }
    private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
    private static final Random RNG = new Random();
    /**
     * the reserved range of correlation id for Sasl requests.
     *
     * Noted: there is a story about reserved range. The response of LIST_OFFSET is compatible to response of SASL_HANDSHAKE.
     * Hence, we could miss the schema error when using schema of SASL_HANDSHAKE to parse response of LIST_OFFSET.
     * For example: the IllegalStateException caused by mismatched correlation id is thrown if following steps happens.
     * 1) sent LIST_OFFSET
     * 2) sent SASL_HANDSHAKE
     * 3) receive response of LIST_OFFSET
     * 4) succeed to use schema of SASL_HANDSHAKE to parse response of LIST_OFFSET
     * 5) throw IllegalStateException due to mismatched correlation id
     * As a simple approach, we force Sasl requests to use a reserved correlation id which is separated from those
     * used in NetworkClient for Kafka requests. Hence, we can guarantee that every SASL request will throw
     * SchemaException due to correlation id mismatch during reauthentication
     */
    public static final int MAX_RESERVED_CORRELATION_ID = Integer.MAX_VALUE;
    /**
     * We only expect one request in-flight a time during authentication so the small range is fine.
     */
    public static final int MIN_RESERVED_CORRELATION_ID = MAX_RESERVED_CORRELATION_ID - 7;
    /**
     * @return true if the correlation id is reserved for SASL request. otherwise, false
     */
    public static boolean isReserved(int correlationId) {
        return correlationId >= MIN_RESERVED_CORRELATION_ID;
    }
    private final Subject subject;
    private final String servicePrincipal;
    private final String host;
    private final String node;
    private final String mechanism;
    private final TransportLayer transportLayer;
    private final SaslClient saslClient;
    private final Map<String, ?> configs;
    private final String clientPrincipalName;
    private final AuthenticateCallbackHandler callbackHandler;
    private final Time time;
    private final Logger log;
    private final ReauthInfo reauthInfo;
    // buffers used in `authenticate`
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;
    // Current SASL state
    private SaslState saslState;
    // Next SASL state to be set when outgoing writes associated with the current SASL state complete
    private SaslState pendingSaslState;
    // Correlation ID for the next request
    private int correlationId;
    // Request header for which response from the server is pending
    private RequestHeader currentRequestHeader;
    // Version of SaslAuthenticate request/responses
    private short saslAuthenticateVersion;
    // Version of SaslHandshake request/responses
    private short saslHandshakeVersion;
    public SaslClientAuthenticator(Map<String, ?> configs,
                                   AuthenticateCallbackHandler callbackHandler,
                                   String node,
                                   Subject subject,
                                   String servicePrincipal,
                                   String host,
                                   String mechanism,
                                   boolean handshakeRequestEnable,
                                   TransportLayer transportLayer,
                                   Time time,
                                   LogContext logContext) {
        this.node = node;
        this.subject = subject;
        this.callbackHandler = callbackHandler;
        this.host = host;
        this.servicePrincipal = servicePrincipal;
        this.mechanism = mechanism;
        this.correlationId = 0;
        this.transportLayer = transportLayer;
        this.configs = configs;
        this.saslAuthenticateVersion = DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER;
        this.time = time;
        this.log = logContext.logger(getClass());
        this.reauthInfo = new ReauthInfo();
        try {
            setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL);
            // determine client principal from subject for Kerberos to use as authorization id for the SaslClient.
            // For other mechanisms, the authenticated principal (username for PLAIN and SCRAM) is used as
            // authorization id. Hence the principal is not specified for creating the SaslClient.
            if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM))
                this.clientPrincipalName = firstPrincipal(subject);
            else
                this.clientPrincipalName = null;
            saslClient = createSaslClient();
        } catch (Exception e) {
            throw new SaslAuthenticationException("Failed to configure SaslClientAuthenticator", e);
        }
    }
    // visible for testing
    SaslClient createSaslClient() {
        try {
            return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
                String[] mechs = {mechanism};
                log.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
                    clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
                SaslClient retvalSaslClient = Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
                if (retvalSaslClient == null) {
                    throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism);
                }
                return retvalSaslClient;
            });
        } catch (PrivilegedActionException e) {
            throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
        }
    }
    /**
     * Sends an empty message to the server to initiate the authentication process. It then evaluates server challenges
     * via `SaslClient.evaluateChallenge` and returns client responses until authentication succeeds or fails.
     *
     * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
     * followed by N bytes representing the opaque payload.
     */
    @SuppressWarnings("fallthrough")
    public void authenticate() throws IOException {
        if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
            return;
        switch (saslState) {
            case SEND_APIVERSIONS_REQUEST:
                // Always use version 0 request since brokers treat requests with schema exceptions as GSSAPI tokens
                ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build((short) 0);
                send(apiVersionsRequest.toSend(nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version())));
                setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE);
                break;
            case RECEIVE_APIVERSIONS_RESPONSE:
                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) receiveKafkaResponse();
                if (apiVersionsResponse == null)
                    break;
                else {
                    setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
                    reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse;
                    setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
                    // Fall through to send handshake request with the latest supported version
                }
            case SEND_HANDSHAKE_REQUEST:
                sendHandshakeRequest(saslHandshakeVersion);
                setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                break;
            case RECEIVE_HANDSHAKE_RESPONSE:
                SaslHandshakeResponse handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
                if (handshakeResponse == null)
                    break;
                else {
                    handleSaslHandshakeResponse(handshakeResponse);
                    setSaslState(SaslState.INITIAL);
                    // Fall through and start SASL authentication using the configured client mechanism
                }
            case INITIAL:
                sendInitialToken();
                setSaslState(SaslState.INTERMEDIATE);
                break;
            case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
                setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
                setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will set immediately
                // Fall through to send handshake request with the latest supported version
            case REAUTH_SEND_HANDSHAKE_REQUEST:
                sendHandshakeRequest(saslHandshakeVersion);
                setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
                break;
            case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
                handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
                if (handshakeResponse == null)
                    break;
                handleSaslHandshakeResponse(handshakeResponse);
                setSaslState(SaslState.REAUTH_INITIAL); // Will set immediately
                /*
                 * Fall through and start SASL authentication using the configured client
                 * mechanism. Note that we have to either fall through or add a loop to enter
                 * the switch statement again. We will fall through to avoid adding the loop and
                 * therefore minimize the changes to authentication-related code due to the
                 * changes related to re-authentication.
                 */
            case REAUTH_INITIAL:
                sendInitialToken();
                setSaslState(SaslState.INTERMEDIATE);
                break;
            case INTERMEDIATE:
                byte[] serverToken = receiveToken();
                boolean noResponsesPending = serverToken != null && !sendSaslClientToken(serverToken, false);
                // For versions without SASL_AUTHENTICATE header, SASL exchange may be complete after a token is sent to server.
                // For versions with SASL_AUTHENTICATE header, server always sends a response to each SASL_AUTHENTICATE request.
                if (saslClient.isComplete()) {
                    if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER || noResponsesPending)
                        setSaslState(SaslState.COMPLETE);
                    else
                        setSaslState(SaslState.CLIENT_COMPLETE);
                }
                break;
            case CLIENT_COMPLETE:
                byte[] serverResponse = receiveToken();
                if (serverResponse != null)
                    setSaslState(SaslState.COMPLETE);
                break;
            case COMPLETE:
                break;
            case FAILED:
                // Should never get here since exception would have been propagated earlier
                throw new IllegalStateException("SASL handshake has already failed");
        }
    }
    private void sendHandshakeRequest(short version) throws IOException {
        SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(version);
        send(handshakeRequest.toSend(nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
    }
    private void sendInitialToken() throws IOException {
        sendSaslClientToken(new byte[0], true);
    }
    @Override
    public void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
        SaslClientAuthenticator previousSaslClientAuthenticator = (SaslClientAuthenticator) Objects
                .requireNonNull(reauthenticationContext).previousAuthenticator();
        ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication = previousSaslClientAuthenticator.reauthInfo
                .apiVersionsResponse();
        previousSaslClientAuthenticator.close();
        reauthInfo.reauthenticating(apiVersionsResponseFromOriginalAuthentication,
                reauthenticationContext.reauthenticationBeginNanos());
        NetworkReceive netInBufferFromChannel = reauthenticationContext.networkReceive();
        netInBuffer = netInBufferFromChannel;
        setSaslState(SaslState.REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE); // Will set immediately
        authenticate();
    }
    @Override
    public Optional<NetworkReceive> pollResponseReceivedDuringReauthentication() {
        return reauthInfo.pollResponseReceivedDuringReauthentication();
    }
    @Override
    public Long clientSessionReauthenticationTimeNanos() {
        return reauthInfo.clientSessionReauthenticationTimeNanos;
    }
    @Override
    public Long reauthenticationLatencyMs() {
        return reauthInfo.reauthenticationLatencyMs();
    }
    // visible for testing
    int nextCorrelationId() {
        if (!isReserved(correlationId))
            correlationId = MIN_RESERVED_CORRELATION_ID;
        return correlationId++;
    }
    private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
        String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
        short requestApiKey = apiKey.id;
        currentRequestHeader = new RequestHeader(
            new RequestHeaderData().
                setRequestApiKey(requestApiKey).
                setRequestApiVersion(version).
                setClientId(clientId).
                setCorrelationId(nextCorrelationId()),
            apiKey.requestHeaderVersion(version));
        return currentRequestHeader;
    }
    // Visible to override for testing
    protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
        return new SaslHandshakeRequest.Builder(
                new SaslHandshakeRequestData().setMechanism(mechanism)).build(version);
    }
    // Visible to override for testing
    protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVersionsResponse) {
        ApiVersion authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
        if (authenticateVersion != null) {
            this.saslAuthenticateVersion = (short) Math.min(authenticateVersion.maxVersion(),
                    ApiKeys.SASL_AUTHENTICATE.latestVersion());
        }
        ApiVersion handshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id);
        if (handshakeVersion != null) {
            this.saslHandshakeVersion = (short) Math.min(handshakeVersion.maxVersion(),
                    ApiKeys.SASL_HANDSHAKE.latestVersion());
        }
    }
    private void setSaslState(SaslState saslState) {
        if (netOutBuffer != null && !netOutBuffer.completed())
            pendingSaslState = saslState;
        else {
            this.pendingSaslState = null;
            this.saslState = saslState;
            log.debug("Set SASL client state to {}", saslState);
            if (saslState == SaslState.COMPLETE) {
                reauthInfo.setAuthenticationEndAndSessionReauthenticationTimes(time.nanoseconds());
                if (!reauthInfo.reauthenticating())
                    transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
                else
                    /*
                     * Re-authentication is triggered by a write, so we have to make sure that
                     * pending write is actually sent.
                     */
                    transportLayer.addInterestOps(SelectionKey.OP_WRITE);
            }
        }
    }
    /**
     * Sends a SASL client token to server if required. This may be an initial token to start
     * SASL token exchange or response to a challenge from the server.
     * @return true if a token was sent to the server
     */
    private boolean sendSaslClientToken(byte[] serverToken, boolean isInitial) throws IOException {
        if (!saslClient.isComplete()) {
            byte[] saslToken = createSaslToken(serverToken, isInitial);
            if (saslToken != null) {
                ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken);
                Send send;
                if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
                    send = ByteBufferSend.sizePrefixed(tokenBuf);
                } else {
                    SaslAuthenticateRequestData data = new SaslAuthenticateRequestData()
                            .setAuthBytes(tokenBuf.array());
                    SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build(saslAuthenticateVersion);
                    send = request.toSend(nextRequestHeader(ApiKeys.SASL_AUTHENTICATE, saslAuthenticateVersion));
                }
                send(send);
                return true;
            }
        }
        return false;
    }
    private void send(Send send) throws IOException {
        try {
            netOutBuffer = send;
            flushNetOutBufferAndUpdateInterestOps();
        } catch (IOException e) {
            setSaslState(SaslState.FAILED);
            throw e;
        }
    }
    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushedCompletely = flushNetOutBuffer();
        if (flushedCompletely) {
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
            if (pendingSaslState != null)
                setSaslState(pendingSaslState);
        } else
            transportLayer.addInterestOps(SelectionKey.OP_WRITE);
        return flushedCompletely;
    }
    private byte[] receiveResponseOrToken() throws IOException {
        if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
        netInBuffer.readFrom(transportLayer);
        byte[] serverPacket = null;
        if (netInBuffer.complete()) {
            netInBuffer.payload().rewind();
            serverPacket = new byte[netInBuffer.payload().remaining()];
            netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
            netInBuffer = null; // reset the networkReceive as we read all the data.
        }
        return serverPacket;
    }
    public KafkaPrincipal principal() {
        return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
    }
    @Override
    public Optional<KafkaPrincipalSerde> principalSerde() {
        return Optional.empty();
    }
    public boolean complete() {
        return saslState == SaslState.COMPLETE;
    }
    public void close() throws IOException {
        if (saslClient != null)
            saslClient.dispose();
    }
    private byte[] receiveToken() throws IOException {
        if (saslAuthenticateVersion == DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
            return receiveResponseOrToken();
        } else {
            SaslAuthenticateResponse response = (SaslAuthenticateResponse) receiveKafkaResponse();
            if (response != null) {
                Errors error = response.error();
                if (error != Errors.NONE) {
                    setSaslState(SaslState.FAILED);
                    String errMsg = response.errorMessage();
                    throw errMsg == null ? error.exception() : error.exception(errMsg);
                }
                long sessionLifetimeMs = response.sessionLifetimeMs();
                if (sessionLifetimeMs > 0L)
                    reauthInfo.positiveSessionLifetimeMs = sessionLifetimeMs;
                return Utils.copyArray(response.saslAuthBytes());
            } else
                return null;
        }
    }
    private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
        if (saslToken == null)
            throw new IllegalSaslStateException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
        try {
            if (isInitial && !saslClient.hasInitialResponse())
                return saslToken;
            else
                return Subject.doAs(subject, (PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(saslToken));
        } catch (PrivilegedActionException e) {
            String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
            KerberosError kerberosError = KerberosError.fromException(e);
            // Try to provide hints to use about what went wrong so they can fix their configuration.
            if (kerberosError == KerberosError.SERVER_NOT_FOUND) {
                error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
                    " hostname correctly. You may want to try to adding" +
                    " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
                    " Users must configure FQDN of kafka brokers when authenticating using SASL and" +
                    " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
            }
            //Unwrap the SaslException inside `PrivilegedActionException`
            Throwable cause = e.getCause();
            // Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions
            // and all other failures as fatal SaslAuthenticationException.
            if ((kerberosError != null && kerberosError.retriable()) || (kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
                error += " Kafka Client will retry.";
                throw new SaslException(error, cause);
            } else {
                error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
                throw new SaslAuthenticationException(error, cause);
            }
        }
    }
    private boolean flushNetOutBuffer() throws IOException {
        if (!netOutBuffer.completed()) {
            netOutBuffer.writeTo(transportLayer);
        }
        return netOutBuffer.completed();
    }
    private AbstractResponse receiveKafkaResponse() throws IOException {
        if (netInBuffer == null)
            netInBuffer = new NetworkReceive(node);
        NetworkReceive receive = netInBuffer;
        try {
            byte[] responseBytes = receiveResponseOrToken();
            if (responseBytes == null)
                return null;
            else {
                AbstractResponse response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), currentRequestHeader);
                currentRequestHeader = null;
                return response;
            }
        } catch (BufferUnderflowException | SchemaException | IllegalArgumentException e) {
            /*
             * Account for the fact that during re-authentication there may be responses
             * arriving for requests that were sent in the past.
             */
            if (reauthInfo.reauthenticating()) {
                /*
                 * It didn't match the current request header, so it must be unrelated to
                 * re-authentication. Save it so it can be processed later.
                 */
                receive.payload().rewind();
                reauthInfo.pendingAuthenticatedReceives.add(receive);
                return null;
            }
            log.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
            setSaslState(SaslState.FAILED);
            throw new IllegalSaslStateException("Invalid SASL mechanism response, server may be expecting a different protocol", e);
        }
    }
    private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
        Errors error = response.error();
        if (error != Errors.NONE)
            setSaslState(SaslState.FAILED);
        switch (error) {
            case NONE:
                break;
            case UNSUPPORTED_SASL_MECHANISM:
                throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s",
                    mechanism, response.enabledMechanisms()));
            case ILLEGAL_SASL_STATE:
                throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s",
                    mechanism, response.enabledMechanisms()));
            default:
                throw new IllegalSaslStateException(String.format("Unknown error code %s, client mechanism is %s, enabled mechanisms are %s",
                    response.error(), mechanism, response.enabledMechanisms()));
        }
    }
    /**
     * Returns the first Principal from Subject.
     * @throws KafkaException if there are no Principals in the Subject.
     *     During Kerberos re-login, principal is reset on Subject. An exception is
     *     thrown so that the connection is retried after any configured backoff.
     */
    public static String firstPrincipal(Subject subject) {
        Set<Principal> principals = subject.getPrincipals();
        synchronized (principals) {
            Iterator<Principal> iterator = principals.iterator();
            if (iterator.hasNext())
                return iterator.next().getName();
            else
                throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login");
        }
    }
    /**
     * Information related to re-authentication
     */
    private class ReauthInfo {
        public ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication;
        public long reauthenticationBeginNanos;
        public List<NetworkReceive> pendingAuthenticatedReceives = new ArrayList<>();
        public ApiVersionsResponse apiVersionsResponseReceivedFromBroker;
        public Long positiveSessionLifetimeMs;
        public long authenticationEndNanos;
        public Long clientSessionReauthenticationTimeNanos;
        public void reauthenticating(ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication,
                                     long reauthenticationBeginNanos) {
            this.apiVersionsResponseFromOriginalAuthentication = Objects
                    .requireNonNull(apiVersionsResponseFromOriginalAuthentication);
            this.reauthenticationBeginNanos = reauthenticationBeginNanos;
        }
        public boolean reauthenticating() {
            return apiVersionsResponseFromOriginalAuthentication != null;
        }
        public ApiVersionsResponse apiVersionsResponse() {
            return reauthenticating() ? apiVersionsResponseFromOriginalAuthentication
                    : apiVersionsResponseReceivedFromBroker;
        }
        /**
         * Return the (always non-null but possibly empty) NetworkReceive response that
         * arrived during re-authentication that is unrelated to re-authentication, if
         * any. This corresponds to a request sent prior to the beginning of
         * re-authentication; the request was made when the channel was successfully
         * authenticated, and the response arrived during the re-authentication
         * process.
         * 
         * @return the (always non-null but possibly empty) NetworkReceive response
         *         that arrived during re-authentication that is unrelated to
         *         re-authentication, if any
         */
        public Optional<NetworkReceive> pollResponseReceivedDuringReauthentication() {
            if (pendingAuthenticatedReceives.isEmpty())
                return Optional.empty();
            return Optional.of(pendingAuthenticatedReceives.remove(0));
        }
        public void setAuthenticationEndAndSessionReauthenticationTimes(long nowNanos) {
            authenticationEndNanos = nowNanos;
            long sessionLifetimeMsToUse = 0;
            if (positiveSessionLifetimeMs != null) {
                // pick a random percentage between 85% and 95% for session re-authentication
                double pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount = 0.85;
                double pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously = 0.10;
                double pctToUse = pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount + RNG.nextDouble()
                        * pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously;
                sessionLifetimeMsToUse = (long) (positiveSessionLifetimeMs * pctToUse);
                clientSessionReauthenticationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMsToUse;
                log.debug(
                        "Finished {} with session expiration in {} ms and session re-authentication on or after {} ms",
                        authenticationOrReauthenticationText(), positiveSessionLifetimeMs, sessionLifetimeMsToUse);
            } else
                log.debug("Finished {} with no session expiration and no session re-authentication",
                        authenticationOrReauthenticationText());
        }
        public Long reauthenticationLatencyMs() {
            return reauthenticating()
                    ? Math.round((authenticationEndNanos - reauthenticationBeginNanos) / 1000.0 / 1000.0)
                    : null;
        }
        private String authenticationOrReauthenticationText() {
            return reauthenticating() ? "re-authentication" : "authentication";
        }
    }
}
相关信息
相关文章
kafka DefaultKafkaPrincipalBuilder 源码
kafka SaslClientCallbackHandler 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang