kafka RetryWithToleranceOperator 源码

  • 2022-10-20
  • 浏览 (238)

kafka RetryWithToleranceOperator 代码

文件路径:/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.connect.runtime.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * Attempt to recover a failed operation with retries and tolerance limits.
 * <p>
 *
 * A retry is attempted if the operation throws a {@link RetriableException}. Retries are accompanied by exponential backoffs, starting with
 * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link ConnectorConfig#errorMaxDelayInMillis()}.
 * Including the first attempt and future retries, the total time taken to evaluate the operation should be within
 * {@link ConnectorConfig#errorMaxDelayInMillis()} millis.
 * <p>
 *
 * This executor will tolerate failures, as specified by {@link ConnectorConfig#errorToleranceType()}.
 * For transformations and converters, all exceptions are tolerated. For others operations, only {@link RetriableException} are tolerated.
 * <p>
 *
 * There are three outcomes to executing an operation. It might succeed, in which case the result is returned to the caller.
 * If it fails, this class does one of these two things: (1) if the failure occurred due to a tolerable exception, then
 * set appropriate error reason in the {@link ProcessingContext} and return null, or (2) if the exception is not tolerated,
 * then it is wrapped into a ConnectException and rethrown to the caller.
 * <p>
 *
 * Instances of this class are thread safe.
 * <p>
 */
public class RetryWithToleranceOperator implements AutoCloseable {

    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);

    public static final long RETRIES_DELAY_MIN_MS = 300;

    private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>();
    static {
        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
    }

    private final long errorRetryTimeout;
    private final long errorMaxDelayInMillis;
    private final ToleranceType errorToleranceType;

    private long totalFailures = 0;
    private final Time time;
    private final ErrorHandlingMetrics errorHandlingMetrics;
    private final CountDownLatch stopRequestedLatch;
    private volatile boolean stopping;   // indicates whether the operator has been asked to stop retrying

    protected final ProcessingContext context;

    public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
                                      ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) {
        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
    }

    RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
                               ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics,
                               ProcessingContext context, CountDownLatch stopRequestedLatch) {
        this.errorRetryTimeout = errorRetryTimeout;
        this.errorMaxDelayInMillis = errorMaxDelayInMillis;
        this.errorToleranceType = toleranceType;
        this.time = time;
        this.errorHandlingMetrics = errorHandlingMetrics;
        this.context = context;
        this.stopRequestedLatch = stopRequestedLatch;
        this.stopping = false;
    }

    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                      ConsumerRecord<byte[], byte[]> consumerRecord,
                                      Throwable error) {

        markAsFailed();
        context.consumerRecord(consumerRecord);
        context.currentContext(stage, executingClass);
        context.error(error);
        errorHandlingMetrics.recordFailure();
        Future<Void> errantRecordFuture = context.report();
        if (!withinToleranceLimits()) {
            errorHandlingMetrics.recordError();
            throw new ConnectException("Tolerance exceeded in error handler", error);
        }
        return errantRecordFuture;
    }

    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                                   SourceRecord sourceRecord,
                                                   Throwable error) {

        markAsFailed();
        context.sourceRecord(sourceRecord);
        context.currentContext(stage, executingClass);
        context.error(error);
        errorHandlingMetrics.recordFailure();
        Future<Void> errantRecordFuture = context.report();
        if (!withinToleranceLimits()) {
            errorHandlingMetrics.recordError();
            throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
        }
        return errantRecordFuture;
    }

    /**
     * Execute the recoverable operation. If the operation is already in a failed state, then simply return
     * with the existing failure.
     *
     * @param operation the recoverable operation
     * @param <V> return type of the result of the operation.
     * @return result of the operation
     */
    public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
        context.currentContext(stage, executingClass);

        if (context.failed()) {
            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
            return null;
        }

        try {
            Class<? extends Exception> ex = TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
            return execAndHandleError(operation, ex);
        } finally {
            if (context.failed()) {
                errorHandlingMetrics.recordError();
                context.report();
            }
        }
    }

    /**
     * Attempt to execute an operation. Retry if a {@link RetriableException} is raised. Re-throw everything else.
     *
     * @param operation the operation to be executed.
     * @param <V> the return type of the result of the operation.
     * @return the result of the operation.
     * @throws Exception rethrow if a non-retriable Exception is thrown by the operation
     */
    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
        int attempt = 0;
        long startTime = time.milliseconds();
        long deadline = (errorRetryTimeout >= 0) ? startTime + errorRetryTimeout : Long.MAX_VALUE;
        do {
            try {
                attempt++;
                return operation.call();
            } catch (RetriableException e) {
                log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass());
                errorHandlingMetrics.recordFailure();
                if (time.milliseconds() < deadline) {
                    backoff(attempt, deadline);
                    errorHandlingMetrics.recordRetry();
                } else {
                    log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
                    context.error(e);
                    return null;
                }
                if (stopping) {
                    log.trace("Shutdown has been scheduled. Marking operation as failed.");
                    context.error(e);
                    return null;
                }
            } finally {
                context.attempt(attempt);
            }
        } while (true);
    }

    /**
     * Execute a given operation multiple times (if needed), and tolerate certain exceptions.
     * Visible for testing.
     *
     * @param operation the operation to be executed.
     * @param tolerated the class of exceptions which can be tolerated.
     * @param <V> The return type of the result of the operation.
     * @return the result of the operation
     */
    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
        try {
            V result = execAndRetry(operation);
            if (context.failed()) {
                markAsFailed();
                errorHandlingMetrics.recordSkipped();
            }
            return result;
        } catch (Exception e) {
            errorHandlingMetrics.recordFailure();
            markAsFailed();
            context.error(e);

            if (!tolerated.isAssignableFrom(e.getClass())) {
                throw new ConnectException("Unhandled exception in error handler", e);
            }

            if (!withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", e);
            }

            errorHandlingMetrics.recordSkipped();
            return null;
        }
    }

    // Visible for testing
    void markAsFailed() {
        errorHandlingMetrics.recordErrorTimestamp();
        totalFailures++;
    }

    @SuppressWarnings("fallthrough")
    public synchronized boolean withinToleranceLimits() {
        switch (errorToleranceType) {
            case NONE:
                if (totalFailures > 0) return false;
            case ALL:
                return true;
            default:
                throw new ConfigException("Unknown tolerance type: {}", errorToleranceType);
        }
    }

    // For source connectors that want to skip kafka producer errors.
    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
    // to write to kafka.
    public ToleranceType getErrorToleranceType() {
        return errorToleranceType;
    }

    /**
     * Do an exponential backoff bounded by {@link #RETRIES_DELAY_MIN_MS} and {@link #errorMaxDelayInMillis}
     * which can be exited prematurely if {@link #triggerStop()} is called or if the thread is interrupted.
     * Visible for testing.
     * @param attempt the number indicating which backoff attempt it is (beginning with 1)
     * @param deadline the time in milliseconds until when retries can be attempted
     */
    void backoff(int attempt, long deadline) {
        int numRetry = attempt - 1;
        long delay = RETRIES_DELAY_MIN_MS << numRetry;
        if (delay > errorMaxDelayInMillis) {
            delay = ThreadLocalRandom.current().nextLong(errorMaxDelayInMillis);
        }
        long currentTime = time.milliseconds();
        if (delay + currentTime > deadline) {
            delay = Math.max(0, deadline - currentTime);
        }
        log.debug("Sleeping for up to {} millis", delay);
        try {
            stopRequestedLatch.await(delay, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return;
        }
    }

    @Override
    public String toString() {
        return "RetryWithToleranceOperator{" +
                "errorRetryTimeout=" + errorRetryTimeout +
                ", errorMaxDelayInMillis=" + errorMaxDelayInMillis +
                ", errorToleranceType=" + errorToleranceType +
                ", totalFailures=" + totalFailures +
                ", time=" + time +
                ", context=" + context +
                '}';
    }

    /**
     * Set the error reporters for this connector.
     *
     * @param reporters the error reporters (should not be null).
     */
    public synchronized void reporters(List<ErrorReporter> reporters) {
        this.context.reporters(reporters);
    }

    /**
     * Set the source record being processed in the connect pipeline.
     *
     * @param preTransformRecord the source record
     */
    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
        this.context.sourceRecord(preTransformRecord);
    }

    /**
     * Set the record consumed from Kafka in a sink connector.
     *
     * @param consumedMessage the record
     */
    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
        this.context.consumerRecord(consumedMessage);
    }

    /**
     * @return true, if the last operation encountered an error; false otherwise
     */
    public synchronized boolean failed() {
        return this.context.failed();
    }

    /**
     * Returns the error encountered when processing the current stage.
     *
     * @return the error encountered when processing the current stage
     */
    public synchronized Throwable error() {
        return this.context.error();
    }

    /**
     * This will stop any further retries for operations.
     * This will also mark any ongoing operations that are currently backing off for retry as failed.
     * This can be called from a separate thread to break out of retry/backoff loops in
     * {@link #execAndRetry(Operation)}
     */
    public void triggerStop() {
        stopping = true;
        stopRequestedLatch.countDown();
    }

    @Override
    public synchronized void close() {
        this.context.close();
    }
}

相关信息

kafka 源码目录

相关文章

kafka DeadLetterQueueReporter 源码

kafka ErrorHandlingMetrics 源码

kafka ErrorReporter 源码

kafka LogReporter 源码

kafka Operation 源码

kafka ProcessingContext 源码

kafka Stage 源码

kafka ToleranceType 源码

kafka WorkerErrantRecordReporter 源码

0  赞