spring R2dbcTransactionManager 源码
spring R2dbcTransactionManager 代码
文件路径:/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.r2dbc.connection;
import java.time.Duration;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.reactive.AbstractReactiveTransactionManager;
import org.springframework.transaction.reactive.GenericReactiveTransaction;
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
* {@link org.springframework.transaction.ReactiveTransactionManager}
* implementation for a single R2DBC {@link ConnectionFactory}. This class is
* capable of working in any environment with any R2DBC driver, as long as the
* setup uses a {@code ConnectionFactory} as its {@link Connection} factory
* mechanism. Binds a R2DBC {@code Connection} from the specified
* {@code ConnectionFactory} to the current subscriber context, potentially
* allowing for one context-bound {@code Connection} per {@code ConnectionFactory}.
*
* <p><b>Note: The {@code ConnectionFactory} that this transaction manager
* operates on needs to return independent {@code Connection}s.</b>
* The {@code Connection}s may come from a pool (the typical case), but the
* {@code ConnectionFactory} must not return scoped {@code Connection}s
* or the like. This transaction manager will associate {@code Connection}
* with context-bound transactions itself, according to the specified propagation
* behavior. It assumes that a separate, independent {@code Connection} can
* be obtained even during an ongoing transaction.
*
* <p>Application code is required to retrieve the R2DBC Connection via
* {@link ConnectionFactoryUtils#getConnection(ConnectionFactory)}
* instead of a standard R2DBC-style {@link ConnectionFactory#create()} call.
* Spring classes such as {@code DatabaseClient} use this strategy implicitly.
* If not used in combination with this transaction manager, the
* {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the
* native {@code ConnectionFactory} lookup; it can thus be used in a portable fashion.
*
* <p>Alternatively, you can allow application code to work with the standard
* R2DBC lookup pattern {@link ConnectionFactory#create()}, for example for code
* that is not aware of Spring at all. In that case, define a
* {@link TransactionAwareConnectionFactoryProxy} for your target {@code ConnectionFactory},
* and pass that proxy {@code ConnectionFactory} to your DAOs, which will automatically
* participate in Spring-managed transactions when accessing it.
*
* <p>This transaction manager triggers flush callbacks on registered transaction
* synchronizations (if synchronization is generally active), assuming resources
* operating on the underlying R2DBC {@code Connection}.
*
* <p>Spring's {@code TransactionDefinition} attributes are carried forward to R2DBC drivers
* using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. Subclasses may
* override {@link #createTransactionDefinition(TransactionDefinition)} to customize
* transaction definitions for vendor-specific attributes.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.3
* @see ConnectionFactoryUtils#getConnection(ConnectionFactory)
* @see ConnectionFactoryUtils#releaseConnection
* @see TransactionAwareConnectionFactoryProxy
*/
@SuppressWarnings("serial")
public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
@Nullable
private ConnectionFactory connectionFactory;
private boolean enforceReadOnly = false;
/**
* Create a new {@code R2dbcTransactionManager} instance.
* A ConnectionFactory has to be set to be able to use it.
* @see #setConnectionFactory
*/
public R2dbcTransactionManager() {}
/**
* Create a new {@code R2dbcTransactionManager} instance.
* @param connectionFactory the R2DBC ConnectionFactory to manage transactions for
*/
public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
this();
setConnectionFactory(connectionFactory);
afterPropertiesSet();
}
/**
* Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions for.
* <p>This will typically be a locally defined {@code ConnectionFactory}, for example a connection pool.
* <p><b>The {@code ConnectionFactory} passed in here needs to return independent {@link Connection}s.</b>
* The {@code Connection}s may come from a pool (the typical case), but the {@code ConnectionFactory}
* must not return scoped {@code Connection}s or the like.
* @see TransactionAwareConnectionFactoryProxy
*/
public void setConnectionFactory(@Nullable ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* Return the R2DBC {@link ConnectionFactory} that this instance manages transactions for.
*/
@Nullable
public ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
/**
* Obtain the {@link ConnectionFactory} for actual use.
* @return the {@code ConnectionFactory} (never {@code null})
* @throws IllegalStateException in case of no ConnectionFactory set
*/
protected ConnectionFactory obtainConnectionFactory() {
ConnectionFactory connectionFactory = getConnectionFactory();
Assert.state(connectionFactory != null, "No ConnectionFactory set");
return connectionFactory;
}
/**
* Specify whether to enforce the read-only nature of a transaction (as indicated by
* {@link TransactionDefinition#isReadOnly()}) through an explicit statement on the
* transactional connection: "SET TRANSACTION READ ONLY" as understood by Oracle,
* MySQL and Postgres.
* <p>The exact treatment, including any SQL statement executed on the connection,
* can be customized through {@link #prepareTransactionalConnection}.
* @see #prepareTransactionalConnection
*/
public void setEnforceReadOnly(boolean enforceReadOnly) {
this.enforceReadOnly = enforceReadOnly;
}
/**
* Return whether to enforce the read-only nature of a transaction through an
* explicit statement on the transactional connection.
* @see #setEnforceReadOnly
*/
public boolean isEnforceReadOnly() {
return this.enforceReadOnly;
}
@Override
public void afterPropertiesSet() {
if (getConnectionFactory() == null) {
throw new IllegalArgumentException("Property 'connectionFactory' is required");
}
}
@Override
protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException {
ConnectionFactoryTransactionObject txObject = new ConnectionFactoryTransactionObject();
ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(obtainConnectionFactory());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
@Override
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
TransactionDefinition definition) throws TransactionException {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
return Mono.defer(() -> {
Mono<Connection> connectionMono;
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Mono<Connection> newCon = Mono.from(obtainConnectionFactory().create());
connectionMono = newCon.doOnNext(connection -> {
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for R2DBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(connection), true);
});
}
else {
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
}
return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
.then(Mono.from(doBegin(definition, con)))
.then(prepareTransactionalConnection(con, definition))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
Duration timeout = determineTimeout(definition);
if (!timeout.isNegative() && !timeout.isZero()) {
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
}
}).thenReturn(con).onErrorResume(e -> {
if (txObject.isNewConnectionHolder()) {
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
.doOnTerminate(() -> txObject.setConnectionHolder(null, false))
.then(Mono.error(e));
}
return Mono.error(e);
})).onErrorResume(e -> {
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
return Mono.error(ex);
});
}).then();
}
private Publisher<Void> doBegin(TransactionDefinition definition, Connection con) {
io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition);
if (logger.isDebugEnabled()) {
logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]");
}
return con.beginTransaction(transactionDefinition);
}
/**
* Determine the transaction definition from our {@code TransactionDefinition}.
* Can be overridden to wrap the R2DBC {@code TransactionDefinition} to adjust or
* enhance transaction attributes.
* @param definition the transaction definition
* @return the actual transaction definition to use
* @since 6.0
* @see io.r2dbc.spi.TransactionDefinition
*/
protected io.r2dbc.spi.TransactionDefinition createTransactionDefinition(TransactionDefinition definition) {
// Apply specific isolation level, if any.
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
return new ExtendedTransactionDefinition(definition.getName(), definition.isReadOnly(),
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? isolationLevelToUse : null,
determineTimeout(definition));
}
/**
* Determine the actual timeout to use for the given definition.
* Will fall back to this manager's default timeout if the
* transaction definition doesn't specify a non-default value.
* @param definition the transaction definition
* @return the actual timeout to use
* @see org.springframework.transaction.TransactionDefinition#getTimeout()
*/
protected Duration determineTimeout(TransactionDefinition definition) {
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
return Duration.ofSeconds(definition.getTimeout());
}
return Duration.ZERO;
}
@Override
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction)
throws TransactionException {
return Mono.defer(() -> {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
txObject.setConnectionHolder(null);
return Mono.justOrEmpty(synchronizationManager.unbindResource(obtainConnectionFactory()));
});
}
@Override
protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager,
@Nullable Object transaction, Object suspendedResources) throws TransactionException {
return Mono.defer(() -> {
synchronizationManager.bindResource(obtainConnectionFactory(), suspendedResources);
return Mono.empty();
});
}
@Override
protected Mono<Void> doCommit(TransactionSynchronizationManager TransactionSynchronizationManager,
GenericReactiveTransaction status) throws TransactionException {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction();
Connection connection = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing R2DBC transaction on Connection [" + connection + "]");
}
return Mono.from(connection.commitTransaction())
.onErrorMap(R2dbcException.class, ex -> translateException("R2DBC commit", ex));
}
@Override
protected Mono<Void> doRollback(TransactionSynchronizationManager TransactionSynchronizationManager,
GenericReactiveTransaction status) throws TransactionException {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction();
Connection connection = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back R2DBC transaction on Connection [" + connection + "]");
}
return Mono.from(connection.rollbackTransaction())
.onErrorMap(R2dbcException.class, ex -> translateException("R2DBC rollback", ex));
}
@Override
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) throws TransactionException {
return Mono.fromRunnable(() -> {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting R2DBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
txObject.setRollbackOnly();
});
}
@Override
protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
Object transaction) {
return Mono.defer(() -> {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
// Remove the connection holder from the context, if exposed.
if (txObject.isNewConnectionHolder()) {
synchronizationManager.unbindResource(obtainConnectionFactory());
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
Mono<Void> afterCleanup = Mono.empty();
if (txObject.isMustRestoreAutoCommit()) {
afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true)));
}
return afterCleanup.then(Mono.defer(() -> {
try {
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
}
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
}
}
finally {
txObject.getConnectionHolder().clear();
}
return Mono.empty();
}));
});
}
private Mono<Void> switchAutoCommitIfNecessary(Connection con, Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
Mono<Void> prepare = Mono.empty();
// Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.isAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching R2DBC Connection [" + con + "] to manual commit");
}
prepare = prepare.then(Mono.from(con.setAutoCommit(false)));
}
return prepare;
}
/**
* Prepare the transactional {@link Connection} right after transaction begin.
* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
* {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the
* transaction definition indicates a read-only transaction.
* <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
* and may work with other databases as well. If you'd like to adapt this treatment,
* override this method accordingly.
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
* @since 5.3.22
* @see #setEnforceReadOnly
*/
protected Mono<Void> prepareTransactionalConnection(Connection con, TransactionDefinition definition) {
Mono<Void> prepare = Mono.empty();
if (isEnforceReadOnly() && definition.isReadOnly()) {
prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
.flatMapMany(Result::getRowsUpdated)
.then();
}
return prepare;
}
/**
* Resolve the {@linkplain TransactionDefinition#getIsolationLevel() isolation level constant} to a R2DBC
* {@link IsolationLevel}. If you'd like to extend isolation level translation for vendor-specific
* {@code IsolationLevel}s, override this method accordingly.
* @param isolationLevel the isolation level to translate.
* @return the resolved isolation level. Can be {@code null} if not resolvable or the isolation level
* should remain {@link TransactionDefinition#ISOLATION_DEFAULT default}.
* @see TransactionDefinition#getIsolationLevel()
*/
@Nullable
protected IsolationLevel resolveIsolationLevel(int isolationLevel) {
return switch (isolationLevel) {
case TransactionDefinition.ISOLATION_READ_COMMITTED -> IsolationLevel.READ_COMMITTED;
case TransactionDefinition.ISOLATION_READ_UNCOMMITTED -> IsolationLevel.READ_UNCOMMITTED;
case TransactionDefinition.ISOLATION_REPEATABLE_READ -> IsolationLevel.REPEATABLE_READ;
case TransactionDefinition.ISOLATION_SERIALIZABLE -> IsolationLevel.SERIALIZABLE;
default -> null;
};
}
/**
* Translate the given R2DBC commit/rollback exception to a common Spring exception to propagate
* from the {@link #commit}/{@link #rollback} call.
* @param task the task description (commit or rollback).
* @param ex the SQLException thrown from commit/rollback.
* @return the translated exception to emit
*/
protected RuntimeException translateException(String task, R2dbcException ex) {
return ConnectionFactoryUtils.convertR2dbcException(task, null, ex);
}
/**
* Extended R2DBC transaction definition object providing transaction attributes
* to R2DBC drivers when starting a transaction.
*/
private record ExtendedTransactionDefinition(@Nullable String transactionName,
boolean readOnly, @Nullable IsolationLevel isolationLevel, Duration lockWaitTimeout)
implements io.r2dbc.spi.TransactionDefinition {
private ExtendedTransactionDefinition(@Nullable String transactionName, boolean readOnly,
@Nullable IsolationLevel isolationLevel, Duration lockWaitTimeout) {
this.transactionName = transactionName;
this.readOnly = readOnly;
this.isolationLevel = isolationLevel;
this.lockWaitTimeout = lockWaitTimeout;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getAttribute(Option<T> option) {
return (T) doGetValue(option);
}
@Nullable
private Object doGetValue(Option<?> option) {
if (io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL.equals(option)) {
return this.isolationLevel;
}
if (io.r2dbc.spi.TransactionDefinition.NAME.equals(option)) {
return this.transactionName;
}
if (io.r2dbc.spi.TransactionDefinition.READ_ONLY.equals(option)) {
return this.readOnly;
}
if (io.r2dbc.spi.TransactionDefinition.LOCK_WAIT_TIMEOUT.equals(option)
&& !this.lockWaitTimeout.isZero()) {
return this.lockWaitTimeout;
}
return null;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
sb.append(" [transactionName='").append(this.transactionName).append('\'');
sb.append(", readOnly=").append(this.readOnly);
sb.append(", isolationLevel=").append(this.isolationLevel);
sb.append(", lockWaitTimeout=").append(this.lockWaitTimeout);
sb.append(']');
return sb.toString();
}
}
/**
* ConnectionFactory transaction object, representing a ConnectionHolder.
* Used as transaction object by R2dbcTransactionManager.
*/
private static class ConnectionFactoryTransactionObject {
@Nullable
private ConnectionHolder connectionHolder;
private boolean newConnectionHolder;
private boolean mustRestoreAutoCommit;
void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}
boolean isNewConnectionHolder() {
return this.newConnectionHolder;
}
void setRollbackOnly() {
getConnectionHolder().setRollbackOnly();
}
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
public ConnectionHolder getConnectionHolder() {
Assert.state(this.connectionHolder != null, "No ConnectionHolder available");
return this.connectionHolder;
}
public boolean hasConnectionHolder() {
return (this.connectionHolder != null);
}
public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
}
public boolean isMustRestoreAutoCommit() {
return this.mustRestoreAutoCommit;
}
}
}
相关信息
相关文章
spring ConnectionFactoryUtils 源码
spring DelegatingConnectionFactory 源码
spring SingleConnectionFactory 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦