spring ConnectionFactoryUtils 源码

  • 2022-08-08
  • 浏览 (525)

spring ConnectionFactoryUtils 代码

文件路径:/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionFactoryUtils.java

/*
 * Copyright 2002-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.r2dbc.connection;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcPermissionDeniedException;
import io.r2dbc.spi.R2dbcRollbackException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.R2dbcTransientException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.Wrapped;
import reactor.core.publisher.Mono;

import org.springframework.core.Ordered;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.PermissionDeniedDataAccessException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.lang.Nullable;
import org.springframework.r2dbc.BadSqlGrammarException;
import org.springframework.r2dbc.UncategorizedR2dbcException;
import org.springframework.transaction.NoTransactionException;
import org.springframework.transaction.reactive.TransactionSynchronization;
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
 * Helper class that provides static methods for obtaining R2DBC Connections from
 * a {@link ConnectionFactory}.
 *
 * <p>Used internally by Spring's {@code DatabaseClient}, Spring's R2DBC operation
 * objects. Can also be used directly in application code.
 *
 * @author Mark Paluch
 * @author Christoph Strobl
 * @since 5.3
 * @see R2dbcTransactionManager
 * @see org.springframework.transaction.reactive.TransactionSynchronizationManager
 */
public abstract class ConnectionFactoryUtils {

	/**
	 * Order value for ReactiveTransactionSynchronization objects that clean up R2DBC Connections.
	 */
	public static final int CONNECTION_SYNCHRONIZATION_ORDER = 1000;


	/**
	 * Obtain a {@link Connection} from the given {@link ConnectionFactory}.
	 * Translates exceptions into the Spring hierarchy of unchecked generic
	 * data access exceptions, simplifying calling code and making any
	 * exception that is thrown more meaningful.
	 * <p>Is aware of a corresponding Connection bound to the current
	 * {@link TransactionSynchronizationManager}. Will bind a Connection to the
	 * {@link TransactionSynchronizationManager} if transaction synchronization is active.
	 * @param connectionFactory the {@link ConnectionFactory} to obtain
	 * {@link Connection Connections} from
	 * @return a R2DBC Connection from the given {@link ConnectionFactory}
	 * @throws DataAccessResourceFailureException if the attempt to get a
	 * {@link Connection} failed
	 * @see #releaseConnection
	 */
	public static Mono<Connection> getConnection(ConnectionFactory connectionFactory) {
		return doGetConnection(connectionFactory)
				.onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e));
	}

	/**
	 * Actually obtain a R2DBC Connection from the given {@link ConnectionFactory}.
	 * Same as {@link #getConnection}, but preserving the original exceptions.
	 * <p>Is aware of a corresponding Connection bound to the current
	 * {@link TransactionSynchronizationManager}. Will bind a Connection to the
	 * {@link TransactionSynchronizationManager} if transaction synchronization is active
	 * @param connectionFactory the {@link ConnectionFactory} to obtain Connections from
	 * @return a R2DBC {@link Connection} from the given {@link ConnectionFactory}.
	 */
	public static Mono<Connection> doGetConnection(ConnectionFactory connectionFactory) {
		Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
		return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {

			ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
			if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
				conHolder.requested();
				if (!conHolder.hasConnection()) {
					return fetchConnection(connectionFactory).doOnNext(conHolder::setConnection);
				}
				return Mono.just(conHolder.getConnection());
			}
			// Else we either got no holder or an empty thread-bound holder here.

			Mono<Connection> con = fetchConnection(connectionFactory);
			if (synchronizationManager.isSynchronizationActive()) {
				return con.flatMap(connection -> Mono.just(connection).doOnNext(conn -> {
					// Use same Connection for further R2DBC actions within the transaction.
					// Thread-bound object will get removed by synchronization at transaction completion.
					ConnectionHolder holderToUse = conHolder;
					if (holderToUse == null) {
						holderToUse = new ConnectionHolder(conn);
					}
					else {
						holderToUse.setConnection(conn);
					}
					holderToUse.requested();
					synchronizationManager
							.registerSynchronization(new ConnectionSynchronization(holderToUse, connectionFactory));
					holderToUse.setSynchronizedWithTransaction(true);
					if (holderToUse != conHolder) {
						synchronizationManager.bindResource(connectionFactory, holderToUse);
					}
				})      // Unexpected exception from external delegation call -> close Connection and rethrow.
				.onErrorResume(e -> releaseConnection(connection, connectionFactory).then(Mono.error(e))));
			}
			return con;
		}).onErrorResume(NoTransactionException.class, e -> Mono.from(connectionFactory.create()));
	}

	/**
	 * Actually fetch a {@link Connection} from the given {@link ConnectionFactory}.
	 * @param connectionFactory the {@link ConnectionFactory} to obtain
	 * {@link Connection}s from
	 * @return a R2DBC {@link Connection} from the given {@link ConnectionFactory}
	 * (never {@code null}).
	 * @throws IllegalStateException if the {@link ConnectionFactory} returned a {@code null} value.
	 * @see ConnectionFactory#create()
	 */
	private static Mono<Connection> fetchConnection(ConnectionFactory connectionFactory) {
		return Mono.from(connectionFactory.create());
	}

	/**
	 * Close the given {@link Connection}, obtained from the given {@link ConnectionFactory}, if
	 * it is not managed externally (that is, not bound to the subscription).
	 * @param con the {@link Connection} to close if necessary
	 * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
	 * @see #getConnection
	 */
	public static Mono<Void> releaseConnection(Connection con, ConnectionFactory connectionFactory) {
		return doReleaseConnection(con, connectionFactory)
				.onErrorMap(e -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", e));
	}

	/**
	 * Actually close the given {@link Connection}, obtained from the given
	 * {@link ConnectionFactory}. Same as {@link #releaseConnection},
	 * but preserving the original exception.
	 * @param connection the {@link Connection} to close if necessary
	 * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
	 * @see #doGetConnection
	 */
	public static Mono<Void> doReleaseConnection(Connection connection, ConnectionFactory connectionFactory) {
		return TransactionSynchronizationManager.forCurrentTransaction()
				.flatMap(synchronizationManager -> {
			ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
			if (conHolder != null && connectionEquals(conHolder, connection)) {
				// It's the transactional Connection: Don't close it.
				conHolder.released();
			}
			return Mono.from(connection.close());
		}).onErrorResume(NoTransactionException.class, e -> Mono.from(connection.close()));
	}

	/**
	 * Obtain the {@link ConnectionFactory} from the current {@link TransactionSynchronizationManager}.
	 * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
	 * @see TransactionSynchronizationManager
	 */
	public static Mono<ConnectionFactory> currentConnectionFactory(ConnectionFactory connectionFactory) {
		return TransactionSynchronizationManager.forCurrentTransaction()
				.filter(TransactionSynchronizationManager::isSynchronizationActive)
				.filter(synchronizationManager -> {
					ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
					return conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction());
				}).map(synchronizationManager -> connectionFactory);
	}

	/**
	 * Translate the given {@link R2dbcException} into a generic {@link DataAccessException}.
	 * <p>The returned DataAccessException is supposed to contain the original
	 * {@link R2dbcException} as root cause. However, client code may not generally
	 * rely on this due to DataAccessExceptions possibly being caused by other resource
	 * APIs as well. That said, a {@code getRootCause() instanceof R2dbcException}
	 * check (and subsequent cast) is considered reliable when expecting R2DBC-based
	 * access to have happened.
	 * @param task readable text describing the task being attempted
	 * @param sql the SQL query or update that caused the problem (if known)
	 * @param ex the offending {@link R2dbcException}
	 * @return the corresponding DataAccessException instance
	 */
	public static DataAccessException convertR2dbcException(String task, @Nullable String sql, R2dbcException ex) {
		if (ex instanceof R2dbcTransientException) {
			if (ex instanceof R2dbcTransientResourceException) {
				return new TransientDataAccessResourceException(buildMessage(task, sql, ex), ex);
			}
			if (ex instanceof R2dbcRollbackException) {
				return new ConcurrencyFailureException(buildMessage(task, sql, ex), ex);
			}
			if (ex instanceof R2dbcTimeoutException) {
				return new QueryTimeoutException(buildMessage(task, sql, ex), ex);
			}
		}
		if (ex instanceof R2dbcNonTransientException) {
			if (ex instanceof R2dbcNonTransientResourceException) {
				return new DataAccessResourceFailureException(buildMessage(task, sql, ex), ex);
			}
			if (ex instanceof R2dbcDataIntegrityViolationException) {
				return new DataIntegrityViolationException(buildMessage(task, sql, ex), ex);
			}
			if (ex instanceof R2dbcPermissionDeniedException) {
				return new PermissionDeniedDataAccessException(buildMessage(task, sql, ex), ex);
			}
			if (ex instanceof R2dbcBadGrammarException) {
				return new BadSqlGrammarException(task, (sql != null ? sql : ""), ex);
			}
		}
		return new UncategorizedR2dbcException(buildMessage(task, sql, ex), sql, ex);
	}

	/**
	 * Build a message {@code String} for the given {@link R2dbcException}.
	 * <p>To be called by translator subclasses when creating an instance of a generic
	 * {@link org.springframework.dao.DataAccessException} class.
	 * @param task readable text describing the task being attempted
	 * @param sql the SQL statement that caused the problem
	 * @param ex the offending {@code R2dbcException}
	 * @return the message {@code String} to use
	 */
	private static String buildMessage(String task, @Nullable String sql, R2dbcException ex) {
		return task + "; " + (sql != null ? ("SQL [" + sql + "]; ") : "") + ex.getMessage();
	}

	/**
	 * Determine whether the given two {@link Connection}s are equal, asking the target
	 * {@link Connection} in case of a proxy. Used to detect equality even if the user
	 * passed in a raw target Connection while the held one is a proxy.
	 * @param conHolder the {@link ConnectionHolder} for the held {@link Connection} (potentially a proxy)
	 * @param passedInCon the {@link Connection} passed-in by the user (potentially
	 * a target {@link Connection} without proxy).
	 * @return whether the given Connections are equal
	 * @see #getTargetConnection
	 */
	private static boolean connectionEquals(ConnectionHolder conHolder, Connection passedInCon) {
		if (!conHolder.hasConnection()) {
			return false;
		}
		Connection heldCon = conHolder.getConnection();
		// Explicitly check for identity too: for Connection handles that do not implement
		// "equals" properly).
		return (heldCon == passedInCon || heldCon.equals(passedInCon) || getTargetConnection(heldCon).equals(passedInCon));
	}

	/**
	 * Return the innermost target {@link Connection} of the given {@link Connection}.
	 * If the given {@link Connection} is wrapped, it will be unwrapped until a
	 * plain {@link Connection} is found. Otherwise, the passed-in Connection
	 * will be returned as-is.
	 * @param con the {@link Connection} wrapper to unwrap
	 * @return the innermost target Connection, or the passed-in one if not wrapped
	 * @see Wrapped#unwrap()
	 */
	@SuppressWarnings("unchecked")
	public static Connection getTargetConnection(Connection con) {
		Connection conToUse = con;
		while (conToUse instanceof Wrapped<?>) {
			conToUse = ((Wrapped<Connection>) conToUse).unwrap();
		}
		return conToUse;
	}

	/**
	 * Determine the connection synchronization order to use for the given {@link ConnectionFactory}.
	 * Decreased for every level of nesting that a {@link ConnectionFactory} has,
	 * checked through the level of {@link DelegatingConnectionFactory} nesting.
	 * @param connectionFactory the {@link ConnectionFactory} to check
	 * @return the connection synchronization order to use
	 * @see #CONNECTION_SYNCHRONIZATION_ORDER
	 */
	private static int getConnectionSynchronizationOrder(ConnectionFactory connectionFactory) {
		int order = CONNECTION_SYNCHRONIZATION_ORDER;
		ConnectionFactory current = connectionFactory;
		while (current instanceof DelegatingConnectionFactory) {
			order--;
			current = ((DelegatingConnectionFactory) current).getTargetConnectionFactory();
		}
		return order;
	}


	/**
	 * Callback for resource cleanup at the end of a non-native R2DBC transaction.
	 */
	private static class ConnectionSynchronization implements TransactionSynchronization, Ordered {

		private final ConnectionHolder connectionHolder;

		private final ConnectionFactory connectionFactory;

		private final int order;

		private boolean holderActive = true;

		ConnectionSynchronization(ConnectionHolder connectionHolder, ConnectionFactory connectionFactory) {
			this.connectionHolder = connectionHolder;
			this.connectionFactory = connectionFactory;
			this.order = getConnectionSynchronizationOrder(connectionFactory);
		}

		@Override
		public int getOrder() {
			return this.order;
		}

		@Override
		public Mono<Void> suspend() {
			if (this.holderActive) {
				return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
					synchronizationManager.unbindResource(this.connectionFactory);
					if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {
						// Release Connection on suspend if the application doesn't keep
						// a handle to it anymore. We will fetch a fresh Connection if the
						// application accesses the ConnectionHolder again after resume,
						// assuming that it will participate in the same transaction.
						return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory)
								.doOnTerminate(() -> this.connectionHolder.setConnection(null));
					}
					return Mono.empty();
				});
			}
			return Mono.empty();
		}

		@Override
		public Mono<Void> resume() {
			if (this.holderActive) {
				return TransactionSynchronizationManager.forCurrentTransaction()
						.doOnNext(synchronizationManager ->
								synchronizationManager.bindResource(this.connectionFactory, this.connectionHolder))
						.then();
			}
			return Mono.empty();
		}

		@Override
		public Mono<Void> beforeCompletion() {
			// Release Connection early if the holder is not open anymore (that is,
			// not used by another resource that has its own cleanup via transaction
			// synchronization), to avoid issues with strict transaction implementations
			// that expect the close call before transaction completion.
			if (!this.connectionHolder.isOpen()) {
				return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
					synchronizationManager.unbindResource(this.connectionFactory);
					this.holderActive = false;
					if (this.connectionHolder.hasConnection()) {
						return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory);
					}
					return Mono.empty();
				});
			}

			return Mono.empty();
		}

		@Override
		public Mono<Void> afterCompletion(int status) {
			// If we haven't closed the Connection in beforeCompletion,
			// close it now.
			if (this.holderActive) {
				// The bound ConnectionHolder might not be available anymore,
				// since afterCompletion might get called from a different thread.
				return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
					synchronizationManager.unbindResourceIfPossible(this.connectionFactory);
					this.holderActive = false;
					if (this.connectionHolder.hasConnection()) {
						return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory)
								// Reset the ConnectionHolder: It might remain bound to the context.
								.doOnTerminate(() -> this.connectionHolder.setConnection(null));
					}
					return Mono.empty();
				});
			}
			this.connectionHolder.reset();
			return Mono.empty();
		}
	}

}

相关信息

spring 源码目录

相关文章

spring ConnectionHolder 源码

spring DelegatingConnectionFactory 源码

spring R2dbcTransactionManager 源码

spring SingleConnectionFactory 源码

spring TransactionAwareConnectionFactoryProxy 源码

spring package-info 源码

0  赞