kafka RetryWithToleranceOperator 源码
kafka RetryWithToleranceOperator 代码
文件路径:/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* Attempt to recover a failed operation with retries and tolerance limits.
* <p>
*
* A retry is attempted if the operation throws a {@link RetriableException}. Retries are accompanied by exponential backoffs, starting with
* {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link ConnectorConfig#errorMaxDelayInMillis()}.
* Including the first attempt and future retries, the total time taken to evaluate the operation should be within
* {@link ConnectorConfig#errorMaxDelayInMillis()} millis.
* <p>
*
* This executor will tolerate failures, as specified by {@link ConnectorConfig#errorToleranceType()}.
* For transformations and converters, all exceptions are tolerated. For others operations, only {@link RetriableException} are tolerated.
* <p>
*
* There are three outcomes to executing an operation. It might succeed, in which case the result is returned to the caller.
* If it fails, this class does one of these two things: (1) if the failure occurred due to a tolerable exception, then
* set appropriate error reason in the {@link ProcessingContext} and return null, or (2) if the exception is not tolerated,
* then it is wrapped into a ConnectException and rethrown to the caller.
* <p>
*
* Instances of this class are thread safe.
* <p>
*/
public class RetryWithToleranceOperator implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
public static final long RETRIES_DELAY_MIN_MS = 300;
private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>();
static {
TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
}
private final long errorRetryTimeout;
private final long errorMaxDelayInMillis;
private final ToleranceType errorToleranceType;
private long totalFailures = 0;
private final Time time;
private final ErrorHandlingMetrics errorHandlingMetrics;
private final CountDownLatch stopRequestedLatch;
private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying
protected final ProcessingContext context;
public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) {
this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
}
RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics,
ProcessingContext context, CountDownLatch stopRequestedLatch) {
this.errorRetryTimeout = errorRetryTimeout;
this.errorMaxDelayInMillis = errorMaxDelayInMillis;
this.errorToleranceType = toleranceType;
this.time = time;
this.errorHandlingMetrics = errorHandlingMetrics;
this.context = context;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
}
public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
ConsumerRecord<byte[], byte[]> consumerRecord,
Throwable error) {
markAsFailed();
context.consumerRecord(consumerRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
Future<Void> errantRecordFuture = context.report();
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in error handler", error);
}
return errantRecordFuture;
}
public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
SourceRecord sourceRecord,
Throwable error) {
markAsFailed();
context.sourceRecord(sourceRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
Future<Void> errantRecordFuture = context.report();
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
}
return errantRecordFuture;
}
/**
* Execute the recoverable operation. If the operation is already in a failed state, then simply return
* with the existing failure.
*
* @param operation the recoverable operation
* @param <V> return type of the result of the operation.
* @return result of the operation
*/
public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
context.currentContext(stage, executingClass);
if (context.failed()) {
log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
return null;
}
try {
Class<? extends Exception> ex = TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
return execAndHandleError(operation, ex);
} finally {
if (context.failed()) {
errorHandlingMetrics.recordError();
context.report();
}
}
}
/**
* Attempt to execute an operation. Retry if a {@link RetriableException} is raised. Re-throw everything else.
*
* @param operation the operation to be executed.
* @param <V> the return type of the result of the operation.
* @return the result of the operation.
* @throws Exception rethrow if a non-retriable Exception is thrown by the operation
*/
protected <V> V execAndRetry(Operation<V> operation) throws Exception {
int attempt = 0;
long startTime = time.milliseconds();
long deadline = (errorRetryTimeout >= 0) ? startTime + errorRetryTimeout : Long.MAX_VALUE;
do {
try {
attempt++;
return operation.call();
} catch (RetriableException e) {
log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass());
errorHandlingMetrics.recordFailure();
if (time.milliseconds() < deadline) {
backoff(attempt, deadline);
errorHandlingMetrics.recordRetry();
} else {
log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
context.error(e);
return null;
}
if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation as failed.");
context.error(e);
return null;
}
} finally {
context.attempt(attempt);
}
} while (true);
}
/**
* Execute a given operation multiple times (if needed), and tolerate certain exceptions.
* Visible for testing.
*
* @param operation the operation to be executed.
* @param tolerated the class of exceptions which can be tolerated.
* @param <V> The return type of the result of the operation.
* @return the result of the operation
*/
protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
try {
V result = execAndRetry(operation);
if (context.failed()) {
markAsFailed();
errorHandlingMetrics.recordSkipped();
}
return result;
} catch (Exception e) {
errorHandlingMetrics.recordFailure();
markAsFailed();
context.error(e);
if (!tolerated.isAssignableFrom(e.getClass())) {
throw new ConnectException("Unhandled exception in error handler", e);
}
if (!withinToleranceLimits()) {
throw new ConnectException("Tolerance exceeded in error handler", e);
}
errorHandlingMetrics.recordSkipped();
return null;
}
}
// Visible for testing
void markAsFailed() {
errorHandlingMetrics.recordErrorTimestamp();
totalFailures++;
}
@SuppressWarnings("fallthrough")
public synchronized boolean withinToleranceLimits() {
switch (errorToleranceType) {
case NONE:
if (totalFailures > 0) return false;
case ALL:
return true;
default:
throw new ConfigException("Unknown tolerance type: {}", errorToleranceType);
}
}
// For source connectors that want to skip kafka producer errors.
// They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
// to write to kafka.
public ToleranceType getErrorToleranceType() {
return errorToleranceType;
}
/**
* Do an exponential backoff bounded by {@link #RETRIES_DELAY_MIN_MS} and {@link #errorMaxDelayInMillis}
* which can be exited prematurely if {@link #triggerStop()} is called or if the thread is interrupted.
* Visible for testing.
* @param attempt the number indicating which backoff attempt it is (beginning with 1)
* @param deadline the time in milliseconds until when retries can be attempted
*/
void backoff(int attempt, long deadline) {
int numRetry = attempt - 1;
long delay = RETRIES_DELAY_MIN_MS << numRetry;
if (delay > errorMaxDelayInMillis) {
delay = ThreadLocalRandom.current().nextLong(errorMaxDelayInMillis);
}
long currentTime = time.milliseconds();
if (delay + currentTime > deadline) {
delay = Math.max(0, deadline - currentTime);
}
log.debug("Sleeping for up to {} millis", delay);
try {
stopRequestedLatch.await(delay, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return;
}
}
@Override
public String toString() {
return "RetryWithToleranceOperator{" +
"errorRetryTimeout=" + errorRetryTimeout +
", errorMaxDelayInMillis=" + errorMaxDelayInMillis +
", errorToleranceType=" + errorToleranceType +
", totalFailures=" + totalFailures +
", time=" + time +
", context=" + context +
'}';
}
/**
* Set the error reporters for this connector.
*
* @param reporters the error reporters (should not be null).
*/
public synchronized void reporters(List<ErrorReporter> reporters) {
this.context.reporters(reporters);
}
/**
* Set the source record being processed in the connect pipeline.
*
* @param preTransformRecord the source record
*/
public synchronized void sourceRecord(SourceRecord preTransformRecord) {
this.context.sourceRecord(preTransformRecord);
}
/**
* Set the record consumed from Kafka in a sink connector.
*
* @param consumedMessage the record
*/
public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
this.context.consumerRecord(consumedMessage);
}
/**
* @return true, if the last operation encountered an error; false otherwise
*/
public synchronized boolean failed() {
return this.context.failed();
}
/**
* Returns the error encountered when processing the current stage.
*
* @return the error encountered when processing the current stage
*/
public synchronized Throwable error() {
return this.context.error();
}
/**
* This will stop any further retries for operations.
* This will also mark any ongoing operations that are currently backing off for retry as failed.
* This can be called from a separate thread to break out of retry/backoff loops in
* {@link #execAndRetry(Operation)}
*/
public void triggerStop() {
stopping = true;
stopRequestedLatch.countDown();
}
@Override
public synchronized void close() {
this.context.close();
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦