diff --git a/README.md b/README.md index 02519d300e..a3addbe7ce 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ Kotlin version 2.1.+ The following module(s) require JDK 17 or newer: * `spring-transaction` - depends on Spring Framework 6 * `spring7-transaction` - depends on Spring Framework 7 +* `spring7-reactive-transaction` - depends on Spring Framework 7 * `exposed-spring-boot-starter` - depends on Spring Boot 3 * `exposed-spring-boot4-starter` - depends on Spring Boot 4 * `exposed-crypt` - depends on Spring Security 7 diff --git a/build.gradle.kts b/build.gradle.kts index e86d1648cf..c73cc3ac34 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { dokka(projects.exposed.exposedSpringBoot4Starter) dokka(projects.exposed.springTransaction) dokka(projects.exposed.spring7Transaction) + dokka(projects.exposed.spring7ReactiveTransaction) // Kover aggregated coverage dependencies // Include all source modules for coverage aggregation @@ -47,6 +48,7 @@ dependencies { kover(project(":exposed-java-time")) kover(project(":spring-transaction")) kover(project(":spring7-transaction")) + kover(project(":spring7-reactive-transaction")) kover(project(":exposed-spring-boot-starter")) kover(project(":exposed-spring-boot4-starter")) kover(project(":exposed-jdbc")) diff --git a/buildSrc/src/main/kotlin/org/jetbrains/exposed/gradle/TestDbDsl.kt b/buildSrc/src/main/kotlin/org/jetbrains/exposed/gradle/TestDbDsl.kt index dcc582db30..79d91c8f31 100644 --- a/buildSrc/src/main/kotlin/org/jetbrains/exposed/gradle/TestDbDsl.kt +++ b/buildSrc/src/main/kotlin/org/jetbrains/exposed/gradle/TestDbDsl.kt @@ -102,7 +102,7 @@ private fun Project.createDbTestTaskByDialect(db: TestDb, taskName: String, dial if (db.ignoresSpringTests(dialect)) { filter { // exclude all test classes in Spring modules: - // spring-transaction, spring7-transaction, + // spring-transaction, spring7-transaction, spring7-reactive-transaction // exposed-spring-boot-starter, exposed-spring-boot4-starter exclude( "org/jetbrains/exposed/v1/spring/*", diff --git a/exposed-r2dbc/api/exposed-r2dbc.api b/exposed-r2dbc/api/exposed-r2dbc.api index 4440391a9e..17787922c6 100644 --- a/exposed-r2dbc/api/exposed-r2dbc.api +++ b/exposed-r2dbc/api/exposed-r2dbc.api @@ -713,6 +713,7 @@ public class org/jetbrains/exposed/v1/r2dbc/statements/MergeSuspendExecutable : public final class org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl : org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection { public fun (Lorg/reactivestreams/Publisher;Ljava/lang/String;Lorg/jetbrains/exposed/v1/r2dbc/mappers/R2dbcTypeMapping;)V + public fun activeConnection (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun commit (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun executeInBatch (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -865,6 +866,7 @@ public final class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcDatabaseMe } public abstract interface class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection { + public fun activeConnection (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun commit (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun executeInBatch (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -889,6 +891,10 @@ public abstract interface class org/jetbrains/exposed/v1/r2dbc/statements/api/R2 public abstract fun setTransactionIsolation (Lio/r2dbc/spi/IsolationLevel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection$DefaultImpls { + public static fun activeConnection (Lorg/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedDatabaseMetadata : org/jetbrains/exposed/v1/core/statements/api/ExposedDatabaseMetadata { public fun (Ljava/lang/String;)V public abstract fun columns ([Lorg/jetbrains/exposed/v1/core/Table;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/R2dbcDatabase.kt b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/R2dbcDatabase.kt index 7bd24e3e70..ecd2b8d543 100644 --- a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/R2dbcDatabase.kt +++ b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/R2dbcDatabase.kt @@ -173,8 +173,6 @@ class R2dbcDatabase private constructor( connectionUrl = options.urlString connectionUrlMode = options.urlMode TransactionManager.registerManager(this, manager(this)) - // ABOVE should be replaced with BELOW when ThreadLocalTransactionManager is fully deprecated - // TransactionManager.registerManager(this, manager(this)) } } diff --git a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl.kt b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl.kt index f2a2a5f3a4..e09d2c5d42 100644 --- a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl.kt +++ b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl.kt @@ -1,14 +1,10 @@ package org.jetbrains.exposed.v1.r2dbc.statements -import io.r2dbc.spi.Connection -import io.r2dbc.spi.IsolationLevel -import io.r2dbc.spi.Row -import io.r2dbc.spi.RowMetadata -import io.r2dbc.spi.Statement -import io.r2dbc.spi.TransactionDefinition -import io.r2dbc.spi.ValidationDepth +import io.r2dbc.spi.* import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.asPublisher import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitLast import kotlinx.coroutines.reactive.awaitSingle @@ -43,12 +39,30 @@ import java.util.* */ @Suppress("UnusedPrivateMember", "SpreadOperator") class R2dbcConnectionImpl( + /** The publisher of underlying database [Connection] instances contained by this wrapper. */ override val connection: Publisher, private val vendorDialect: String, private val typeMapping: R2dbcTypeMapping ) : R2dbcExposedConnection> { private val metadataProvider: MetadataProvider = MetadataProvider.getProvider(vendorDialect) + /** + * Retrieves a publisher that provides only the single current underlying database [Connection] instance in use. + * + * If none is active, it awaits a value from the [connection] publisher and internally stores the result. Retrieval + * of a new value will invoke `Connection.beginTransaction()`, so this method should preferably be called when an active + * transaction is underway, in order to retrieve the accurate connection object instance. + * + * If this method is invoked intentionally to trigger the start of a transaction, then [setTransactionDefinition] + * should ideally be manually called first with the appropriate [TransactionDefinition], as well as any other + * [Connection] configuration methods. + */ + override suspend fun activeConnection(): Publisher { + // retrieves localConnection if not null, otherwise awaits value from connection publisher + val acquiredConnection: Connection = withConnection { this } + return flowOf(acquiredConnection).asPublisher() + } + override suspend fun getCatalog(): String = withConnection { getCurrentCatalog(metadataProvider) ?: executeSQL(metadataProvider.getUsername()) { row, _ -> @@ -271,10 +285,6 @@ internal fun IsolationLevel.asInt(): Int = isolationLevelMapping.getOrElse(this) error("Unsupported IsolationLevel as Int: ${this.asSql()}") } -internal fun Int.asIsolationLevel(): IsolationLevel = isolationLevelMapping.entries - .firstOrNull { it.value == this }?.key - ?: error("Unsupported Int as IsolationLevel: $this") - internal suspend fun Connection.executeSQL(sqlQuery: String) { if (sqlQuery.isEmpty()) return diff --git a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection.kt b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection.kt index 883abe3c85..3e94ec4446 100644 --- a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection.kt +++ b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection.kt @@ -11,6 +11,9 @@ interface R2dbcExposedConnection { /** The underlying database connection object contained by this wrapper. */ val connection: OriginalConnection + /** Retrieves the current underlying database connection object in use. */ + suspend fun activeConnection(): OriginalConnection = connection + /** Retrieves the name of the connection's catalog. */ suspend fun getCatalog(): String diff --git a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/R2dbcTransactionManager.kt b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/R2dbcTransactionManager.kt index d49b0e3f0f..c5def3846f 100644 --- a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/R2dbcTransactionManager.kt +++ b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/R2dbcTransactionManager.kt @@ -1,7 +1,6 @@ package org.jetbrains.exposed.v1.r2dbc.transactions import io.r2dbc.spi.IsolationLevel -import kotlinx.coroutines.currentCoroutineContext import org.jetbrains.exposed.v1.core.InternalApi import org.jetbrains.exposed.v1.core.Transaction import org.jetbrains.exposed.v1.core.transactions.ThreadLocalTransactionsStack @@ -65,23 +64,23 @@ internal fun R2dbcTransactionManager.createTransactionContext(transaction: Trans } /** - * Returns the current R2DBC transaction from the coroutine context, or null if none exists. + * Returns the current R2DBC transaction from the stack, or null if none exists. * - * This method performs type checking to ensure the transaction in the context is actually - * an [R2dbcTransaction]. If a non-R2DBC transaction is found in the context, an error is thrown + * This method performs type checking to ensure the transaction in the stack is actually + * an [R2dbcTransaction]. If a non-R2DBC transaction is found in the stack, an error is thrown * to prevent type confusion between JDBC and R2DBC transactions. * - * @return The current [R2dbcTransaction] from the coroutine context, or null if no transaction exists - * @throws [IllegalStateException] If the transaction in the context is not an [R2dbcTransaction] + * @return The current [R2dbcTransaction] from the stack, or null if no transaction exists + * @throws [IllegalStateException] If the transaction in the stack is not an [R2dbcTransaction] */ -internal suspend fun R2dbcTransactionManager.getCurrentContextTransaction(): R2dbcTransaction? { +internal fun R2dbcTransactionManager.getCurrentStackTransaction(): R2dbcTransaction? { @OptIn(InternalApi::class) - val transaction = currentCoroutineContext()[contextKey]?.transaction + val transaction = ThreadLocalTransactionsStack.getTransactionOrNull(db) return when { transaction == null -> null transaction is R2dbcTransaction -> transaction else -> error( - "Expected R2dbcTransaction in coroutine context but found ${transaction::class.simpleName}. " + + "Expected R2dbcTransaction in stack but found ${transaction::class.simpleName}. " + "This may indicate mixing JDBC and R2DBC transactions incorrectly." ) } diff --git a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/Transactions.kt b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/Transactions.kt index b1ebab1e4a..208c1da7d8 100644 --- a/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/Transactions.kt +++ b/exposed-r2dbc/src/main/kotlin/org/jetbrains/exposed/v1/r2dbc/transactions/Transactions.kt @@ -113,7 +113,7 @@ suspend fun suspendTransaction( statement: suspend R2dbcTransaction.() -> T ): T { val databaseToUse = resolveR2dbcDatabaseOrThrow(db) - val outer = databaseToUse.transactionManager.getCurrentContextTransaction() + val outer = databaseToUse.transactionManager.getCurrentStackTransaction() return if (outer != null) { val transaction = outer.transactionManager.newTransaction( diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bceca8f572..85c746ba90 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -59,6 +59,7 @@ kotlinx-coroutines = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutin kotlinx-coroutines-debug = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-debug", version.ref = "kotlinCoroutines" } kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "kotlinCoroutines" } kotlinx-coroutines-reactive = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-reactive", version.ref = "kotlinCoroutines" } +kotlinx-coroutines-reactor = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-reactor", version.ref = "kotlinCoroutines" } kotlinx-jvm-datetime = { group = "org.jetbrains.kotlinx", name = "kotlinx-datetime-jvm", version.ref = "kotlinx-datetime" } kotlinx-serialization = { group = "org.jetbrains.kotlinx", name = "kotlinx-serialization-json", version.ref = "kotlinxSerialization" } @@ -85,6 +86,7 @@ spring6-jdbc = { group = "org.springframework", name = "spring-jdbc", version.re spring6-context = { group = "org.springframework", name = "spring-context", version.ref = "springFramework6" } spring6-test = { group = "org.springframework", name = "spring-test", version.ref = "springFramework6" } spring7-jdbc = { group = "org.springframework", name = "spring-jdbc", version.ref = "springFramework7" } +spring7-r2dbc = { group = "org.springframework", name = "spring-r2dbc", version.ref = "springFramework7" } spring7-context = { group = "org.springframework", name = "spring-context", version.ref = "springFramework7" } spring7-test = { group = "org.springframework", name = "spring-test", version.ref = "springFramework7" } diff --git a/settings.gradle.kts b/settings.gradle.kts index f4f68dcae6..f24ae2fe89 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -7,6 +7,7 @@ include("exposed-jodatime") include("exposed-java-time") include("spring-transaction") include("spring7-transaction") +include("spring7-reactive-transaction") include("exposed-spring-boot-starter") include("exposed-spring-boot4-starter") include("exposed-jdbc") diff --git a/spring7-reactive-transaction/api/spring7-reactive-transaction.api b/spring7-reactive-transaction/api/spring7-reactive-transaction.api new file mode 100644 index 0000000000..81fcb4da1d --- /dev/null +++ b/spring7-reactive-transaction/api/spring7-reactive-transaction.api @@ -0,0 +1,12 @@ +public final class org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedSpringReactiveTransactionAttributeSource : org/springframework/transaction/interceptor/TransactionAttributeSource { + public fun ()V + public fun (Lorg/springframework/transaction/interceptor/TransactionAttributeSource;Ljava/util/List;)V + public synthetic fun (Lorg/springframework/transaction/interceptor/TransactionAttributeSource;Ljava/util/List;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun getTransactionAttribute (Ljava/lang/reflect/Method;Ljava/lang/Class;)Lorg/springframework/transaction/interceptor/TransactionAttribute; +} + +public final class org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManager : org/springframework/transaction/reactive/AbstractReactiveTransactionManager { + public fun (Lio/r2dbc/spi/ConnectionFactory;Lorg/jetbrains/exposed/v1/r2dbc/R2dbcDatabaseConfig$Builder;Z)V + public synthetic fun (Lio/r2dbc/spi/ConnectionFactory;Lorg/jetbrains/exposed/v1/r2dbc/R2dbcDatabaseConfig$Builder;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V +} + diff --git a/spring7-reactive-transaction/build.gradle.kts b/spring7-reactive-transaction/build.gradle.kts new file mode 100644 index 0000000000..ea89328c24 --- /dev/null +++ b/spring7-reactive-transaction/build.gradle.kts @@ -0,0 +1,59 @@ +import org.gradle.api.tasks.testing.logging.* +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.tasks.* + +plugins { + kotlin("jvm") + + alias(libs.plugins.dokka) +} + +repositories { + mavenCentral() +} + +kotlin { + jvmToolchain(17) +} + +dependencies { + api(project(":exposed-core")) + api(project(":exposed-r2dbc")) + api(libs.spring7.r2dbc) + api(libs.spring7.context) + implementation(libs.kotlinx.coroutines.reactor) + + testImplementation(project(":exposed-r2dbc-tests")) + testImplementation(kotlin("test")) + testImplementation(libs.junit6) + testRuntimeOnly(libs.junit.platform.launcher) + testImplementation(libs.kotlinx.coroutines.debug) + testImplementation(libs.kotlinx.coroutines.test) + testImplementation(libs.spring7.test) + testImplementation(libs.slf4j) + testImplementation(libs.log4j.slf4j.impl) + testImplementation(libs.log4j.api) + testImplementation(libs.log4j.core) + testImplementation(libs.r2dbc.h2) { + exclude(group = "com.h2database", module = "h2") + } +} + +tasks.withType().configureEach { + compilerOptions { + jvmTarget.set(JvmTarget.JVM_17) + } +} + +tasks.withType().configureEach { + if (JavaVersion.VERSION_1_8 > JavaVersion.current()) { + jvmArgs = listOf("-XX:MaxPermSize=256m") + } + testLogging { + events.addAll(listOf(TestLogEvent.PASSED, TestLogEvent.FAILED, TestLogEvent.SKIPPED)) + showStandardStreams = true + exceptionFormat = TestExceptionFormat.FULL + } + + useJUnitPlatform() +} diff --git a/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedSpringReactiveTransactionAttributeSource.kt b/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedSpringReactiveTransactionAttributeSource.kt new file mode 100644 index 0000000000..05efd3822b --- /dev/null +++ b/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedSpringReactiveTransactionAttributeSource.kt @@ -0,0 +1,38 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.R2dbcException +import org.springframework.transaction.annotation.AnnotationTransactionAttributeSource +import org.springframework.transaction.interceptor.RollbackRuleAttribute +import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute +import org.springframework.transaction.interceptor.TransactionAttribute +import org.springframework.transaction.interceptor.TransactionAttributeSource +import java.lang.reflect.Method + +/** + * A [TransactionAttributeSource] that adds `ExposedR2dbcException` to the rollback rules of the delegate. + * + * @property delegate The delegate [TransactionAttributeSource] to use. Defaults to [AnnotationTransactionAttributeSource]. + * If you use a custom [TransactionAttributeSource], you can pass it here. + */ +class ExposedSpringReactiveTransactionAttributeSource( + private val delegate: TransactionAttributeSource = AnnotationTransactionAttributeSource(), + private val rollbackExceptions: List> = listOf(R2dbcException::class.java) +) : TransactionAttributeSource { + + override fun getTransactionAttribute(method: Method, targetClass: Class<*>?): TransactionAttribute? { + val attr = delegate.getTransactionAttribute(method, targetClass) + if (attr is RuleBasedTransactionAttribute) { + val rules = attr.rollbackRules.toMutableList() + rollbackExceptions.forEach { exception -> + val containsException = rules.any { + it is RollbackRuleAttribute && it.exceptionName == exception.name + } + if (!containsException) { + rules.add(RollbackRuleAttribute(exception)) + } + } + attr.rollbackRules = rules + } + return attr + } +} diff --git a/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManager.kt b/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManager.kt new file mode 100644 index 0000000000..f87ef1b161 --- /dev/null +++ b/spring7-reactive-transaction/src/main/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManager.kt @@ -0,0 +1,356 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.IsolationLevel +import io.r2dbc.spi.R2dbcException +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.mono +import org.jetbrains.exposed.v1.core.InternalApi +import org.jetbrains.exposed.v1.core.StdOutSqlLogger +import org.jetbrains.exposed.v1.core.exposedLogger +import org.jetbrains.exposed.v1.core.transactions.ThreadLocalTransactionsStack +import org.jetbrains.exposed.v1.core.transactions.currentTransactionOrNull +import org.jetbrains.exposed.v1.core.transactions.transactionScope +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.R2dbcTransaction +import org.jetbrains.exposed.v1.r2dbc.transactions.currentOrNull +import org.jetbrains.exposed.v1.r2dbc.transactions.transactionManager +import org.reactivestreams.Publisher +import org.springframework.r2dbc.UncategorizedR2dbcException +import org.springframework.r2dbc.connection.ConnectionHolder +import org.springframework.transaction.CannotCreateTransactionException +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.UnexpectedRollbackException +import org.springframework.transaction.reactive.AbstractReactiveTransactionManager +import org.springframework.transaction.reactive.GenericReactiveTransaction +import org.springframework.transaction.reactive.TransactionSynchronizationManager +import reactor.core.publisher.Mono + +/** + * Transaction Manager implementation that builds on top of Spring's standard reactive transaction workflow. + * + * @param connectionFactory The [ConnectionFactory] entry point for an R2DBC driver when getting a connection. + * @param databaseConfig The configuration that defines custom properties to be used with connections. + * At minimum, a configuration must be provided that specifies `R2dbcDatabaseConfig.explicitDialect`. + * @property showSql Whether transaction queries should be logged. Defaults to `false`. + */ +class SpringReactiveTransactionManager( + private val connectionFactory: ConnectionFactory, + databaseConfig: R2dbcDatabaseConfig.Builder, + private val showSql: Boolean = false, +) : AbstractReactiveTransactionManager() { + + private val database: R2dbcDatabase = R2dbcDatabase.connect( + connectionFactory = connectionFactory, + databaseConfig = databaseConfig, + ) + + override fun doGetTransaction( + synchronizationManager: TransactionSynchronizationManager + ): Any { + return ExposedTransactionObject( + database = database, + ).apply { + connectionHolder = synchronizationManager.getResourceHolderOrNull() + } + } + + override fun doSuspend( + synchronizationManager: TransactionSynchronizationManager, + transaction: Any + ): Mono { + return Mono.defer { + val trxObject = transaction as ExposedTransactionObject + + synchronizationManager.getResourceOrNull() ?: error("No synchronized transaction to suspend") + + Mono + .justOrEmpty( + synchronizationManager.unbindResource(connectionFactory) as ExposedHolderObject + ) + .doOnSuccess { + trxObject.connectionHolder = null + + @OptIn(InternalApi::class) + ThreadLocalTransactionsStack.popTransaction() + } + } + } + + override fun doResume( + synchronizationManager: TransactionSynchronizationManager, + transaction: Any?, + suspendedResources: Any + ): Mono { + return Mono.defer { + val suspendedObject = suspendedResources as ExposedHolderObject + + synchronizationManager.bindResource(connectionFactory, suspendedObject) + + @OptIn(InternalApi::class) + ThreadLocalTransactionsStack.pushTransaction(suspendedObject.transaction) + + Mono.empty() + } + } + + override fun isExistingTransaction(transaction: Any): Boolean { + val trxObject = transaction as ExposedTransactionObject + + return trxObject.getCurrentTransaction() != null + } + + override fun doBegin( + synchronizationManager: TransactionSynchronizationManager, + transaction: Any, + definition: TransactionDefinition + ): Mono { + return Mono.defer { + val trxObject = transaction as ExposedTransactionObject + + @OptIn(InternalApi::class) + val currentTransaction = currentTransactionOrNull() as? R2dbcTransaction + val outerTransactionToUse = if (currentTransaction?.db == database) { + currentTransaction + } else { + null + } + + val newTransaction = trxObject.database.transactionManager.newTransaction( + isolation = definition.isolationLevel.resolveIsolationLevel(), + readOnly = definition.isReadOnly, + outerTransaction = outerTransactionToUse + ).apply { + if (definition.timeout != TransactionDefinition.TIMEOUT_DEFAULT) { + queryTimeout = definition.timeout + } + + if (showSql) { + addLogger(StdOutSqlLogger) + } + } + + // force new coroutine to start in current thread so that potential callbacks can access correct stack + val newConnectionMono: Mono = mono(Dispatchers.Unconfined) { + if (trxObject.connectionHolder == null || trxObject.isExposedNestedTransactionAllowed) { + @Suppress("UNCHECKED_CAST") + val actualConnection = (newTransaction.connection().activeConnection() as Publisher).awaitSingle() + trxObject.connectionHolder = ExposedHolderObject(actualConnection, newTransaction) + trxObject.isNewConnectionHolder = true + } + trxObject.connectionHolder?.isSynchronizedWithTransaction = true + + // Reactor Mono.doOnSuccess() fails if completed result of chained mono is null/empty + true + } + + Mono + .just(newTransaction) + .doOnSuccess { + @OptIn(InternalApi::class) + ThreadLocalTransactionsStack.pushTransaction(newTransaction) + } + .then(newConnectionMono) + .doOnSuccess { + if (definition.timeout != TransactionDefinition.TIMEOUT_DEFAULT) { + trxObject.connectionHolder?.setTimeoutInSeconds(definition.timeout) + } + + if (trxObject.isNewConnectionHolder) { + // otherwise a PROPAGATION_NESTED transaction would incorrectly have the context of its outer + // transaction used when doCommit() or doRollback() is invoked + // https://youtrack.jetbrains.com/issue/EXPOSED-996/Reassess-support-for-Spring-PROPAGATIONNESTED + synchronizationManager.unbindResourceIfPossible(connectionFactory) as? ExposedHolderObject + + synchronizationManager.bindResource(connectionFactory, trxObject.connectionHolder!!) + } + } + .doOnError { ex -> + trxObject.connectionHolder = null + + @OptIn(InternalApi::class) + ThreadLocalTransactionsStack.popTransaction() + + throw CannotCreateTransactionException("Could not open R2DBC Connection for transaction", ex) + } + } + .then() + } + + override fun doCommit( + synchronizationManager: TransactionSynchronizationManager, + status: GenericReactiveTransaction + ): Mono { + val trxObject = status.transaction as ExposedTransactionObject + + synchronizationManager.getResourceOrNull() ?: error("No synchronized transaction to commit") + + // force new coroutine to start in current thread so that doCleanupOnCompletion can access correct stack + return mono(Dispatchers.Unconfined) { + trxObject.commit() + + null + } + } + + override fun doRollback( + synchronizationManager: TransactionSynchronizationManager, + status: GenericReactiveTransaction + ): Mono { + val trxObject = status.transaction as ExposedTransactionObject + + synchronizationManager.getResourceOrNull() ?: error("No synchronized transaction to rollback") + + // force new coroutine to start in current thread so that doCleanupOnCompletion can access correct stack + return mono(Dispatchers.Unconfined) { + trxObject.rollback() + + null + } + } + + override fun doCleanupAfterCompletion( + synchronizationManager: TransactionSynchronizationManager, + transaction: Any + ): Mono { + return Mono.defer { + val trxObject = transaction as ExposedTransactionObject + + val completedTransaction = synchronizationManager.getResourceOrNull()?.also { + clearStatements(it) + } + + @OptIn(InternalApi::class) + ThreadLocalTransactionsStack.popTransaction() + + if (trxObject.isNewConnectionHolder) { + val previous = synchronizationManager.unbindResource(connectionFactory) as ExposedHolderObject + + // otherwise a PROPAGATION_NESTED transaction would incorrectly have the context of its + // now closed inner transaction used when doCommit() or doRollback() is later invoked + // https://youtrack.jetbrains.com/issue/EXPOSED-996/Reassess-support-for-Spring-PROPAGATIONNESTED + completedTransaction?.outerTransaction?.let { outer -> + synchronizationManager.bindResource(connectionFactory, ExposedHolderObject(previous.connection, outer)) + } + } + + // force new coroutine to start in current thread so that future callbacks can access correct stack + mono(Dispatchers.Unconfined) { + completedTransaction?.close() + + null + } + .doOnEach { + if (trxObject.isNewConnectionHolder) { + trxObject.connectionHolder?.released() + } + trxObject.connectionHolder?.clear() + } + } + } + + private fun clearStatements(transaction: R2dbcTransaction) { + val currentStatement = transaction.currentStatement + currentStatement?.let { + // No Statement.close() in R2DBC + transaction.currentStatement = null + } + transaction.clearExecutedStatements() + } + + override fun doSetRollbackOnly( + synchronizationManager: TransactionSynchronizationManager, + status: GenericReactiveTransaction + ): Mono { + return Mono.fromRunnable { + val trxObject = status.transaction as ExposedTransactionObject + + if (status.isDebug) { + exposedLogger.debug("Exposed transaction [${status.transactionName}] set rollback-only") + } + + trxObject.setRollbackOnly() + } + } + + /** + * This can be bound to the Spring R2DBC synchronization manager, which makes Spring R2DBC see the same + * connection as is currently held and managed by the Exposed [R2dbcTransaction]. + * + * When installed using [TransactionSynchronizationManager.bindResource], Spring R2DBC constructs like + * DatabaseClient will see the same connection as Exposed and partake in the same transaction with the + * same underlying autocommit-disabled connection. + * + * It additionally stores the active transaction that is using the held connection, so that the stack can be + * synchronized accurately when binding/unbinding the resource. + */ + private class ExposedHolderObject( + connection: Connection, + val transaction: R2dbcTransaction, + ) : ConnectionHolder(connection) + + private data class ExposedTransactionObject( + val database: R2dbcDatabase, + ) { + var isNewConnectionHolder: Boolean = false + var connectionHolder: ExposedHolderObject? = null + val isExposedNestedTransactionAllowed: Boolean = database.config.useNestedTransactions + + @Suppress("TooGenericExceptionCaught") + suspend fun commit() { + try { + if (connectionHolder?.isRollbackOnly == true) { + throw UnexpectedRollbackException("Attempting to commit a transaction that is only set for rollback") + } + + getCurrentTransaction()?.commit() + } catch (error: R2dbcException) { + throw UncategorizedR2dbcException(error.message.orEmpty(), null, error) + } + } + + @Suppress("TooGenericExceptionCaught") + suspend fun rollback() { + try { + getCurrentTransaction()?.rollback() + } catch (error: R2dbcException) { + throw UncategorizedR2dbcException(error.message.orEmpty(), null, error) + } + } + + fun getCurrentTransaction(): R2dbcTransaction? { + return connectionHolder?.transaction + ?: database.transactionManager.currentOrNull() + } + + fun setRollbackOnly() { + getCurrentTransaction()?.isRollback = true + connectionHolder?.setRollbackOnly() + } + } + + private fun TransactionSynchronizationManager.getResourceHolderOrNull(): ExposedHolderObject? { + return this.getResource(connectionFactory) as? ExposedHolderObject + } + + private fun TransactionSynchronizationManager.getResourceOrNull(): R2dbcTransaction? { + return this.getResourceHolderOrNull()?.transaction + } +} + +private var R2dbcTransaction.isRollback: Boolean by transactionScope { false } + +/** Returns the rollback status of the current [R2dbcTransaction]. */ +internal fun R2dbcTransaction.isMarkedRollback(): Boolean = isRollback + +internal fun Int.resolveIsolationLevel(): IsolationLevel? = when (this) { + TransactionDefinition.ISOLATION_READ_UNCOMMITTED -> IsolationLevel.READ_UNCOMMITTED + TransactionDefinition.ISOLATION_READ_COMMITTED -> IsolationLevel.READ_COMMITTED + TransactionDefinition.ISOLATION_REPEATABLE_READ -> IsolationLevel.REPEATABLE_READ + TransactionDefinition.ISOLATION_SERIALIZABLE -> IsolationLevel.SERIALIZABLE + TransactionDefinition.ISOLATION_DEFAULT -> null + else -> error("Unsupported Int as IsolationLevel: $this") +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionFactorySpy.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionFactorySpy.kt new file mode 100644 index 0000000000..179c6b2242 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionFactorySpy.kt @@ -0,0 +1,31 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.ConnectionFactoryMetadata +import kotlinx.coroutines.reactive.awaitFirst +import org.reactivestreams.Publisher +import org.springframework.r2dbc.connection.ConnectionFactoryUtils +import reactor.core.publisher.Mono + +class ConnectionFactorySpy( + private val connectionSpy: (Connection) -> Connection +) : ConnectionFactory { + private val connectionPublisher = ConnectionFactoryUtils.getConnection( + ConnectionFactories.get("r2dbc:h2:mem:///test") + ) + + private lateinit var con: Connection + + suspend fun getCon(): Connection { + if (::con.isInitialized.not()) { + con = connectionSpy(connectionPublisher.awaitFirst()) + } + return con + } + + override fun create(): Publisher = Mono.just(con) + + override fun getMetadata(): ConnectionFactoryMetadata = throw NotImplementedError() +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionSpy.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionSpy.kt new file mode 100644 index 0000000000..8d0fd2a2c1 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ConnectionSpy.kt @@ -0,0 +1,109 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.IsolationLevel +import io.r2dbc.spi.TransactionDefinition +import org.reactivestreams.Publisher +import reactor.core.publisher.Mono + +class ConnectionSpy(private val connection: Connection) : Connection by connection { + // some mocks from JDBC tests are excluded as they have no relevance here, like mockIsClosed + var commitCallCount: Int = 0 + var rollbackCallCount: Int = 0 + var closeCallCount: Int = 0 + var releaseSavepointCallCount: Int = 0 + var mockReadOnly: Boolean = false + var mockAutoCommit: Boolean = false + var mockTransactionIsolation: IsolationLevel = IsolationLevel.READ_COMMITTED + var mockCommit: () -> Unit = {} + var mockRollback: () -> Unit = {} + private val callOrder = mutableListOf() + + fun verifyCallOrder(vararg functions: String): Boolean { + val indices = functions.map { callOrder.indexOf(it) } + return indices.none { it == -1 } && indices == indices.sorted() + } + + fun clearMock() { + commitCallCount = 0 + rollbackCallCount = 0 + closeCallCount = 0 + releaseSavepointCallCount = 0 + mockReadOnly = false + mockAutoCommit = false + mockTransactionIsolation = IsolationLevel.READ_COMMITTED + mockCommit = {} + mockRollback = {} + callOrder.clear() + } + + override fun close(): Publisher = Mono.defer { + callOrder.add("close") + closeCallCount++ + + Mono.empty() + } as Publisher + + override fun setAutoCommit(autoCommit: Boolean): Publisher = Mono.defer { + callOrder.add("setAutoCommit") + mockAutoCommit = autoCommit + + Mono.empty() + } as Publisher + + override fun beginTransaction(): Publisher = Mono.defer { + callOrder.add("setAutoCommit") + mockAutoCommit = false + + Mono.empty() + } as Publisher + + override fun beginTransaction(definition: TransactionDefinition): Publisher = Mono.defer { + callOrder.add("setAutoCommit") + mockAutoCommit = false + + definition + .getAttribute(TransactionDefinition.READ_ONLY) + ?.let { + callOrder.add("setReadOnly") + mockReadOnly = it + } + + Mono.empty() + } as Publisher + + override fun isAutoCommit(): Boolean = mockAutoCommit + + override fun commitTransaction(): Publisher = Mono.defer { + callOrder.add("commit") + commitCallCount++ + mockCommit() + + Mono.empty() + } as Publisher + + override fun rollbackTransaction(): Publisher = Mono.defer { + callOrder.add("rollback") + rollbackCallCount++ + mockRollback() + + Mono.empty() + } as Publisher + + override fun rollbackTransactionToSavepoint(p0: String): Publisher = Mono.defer { + callOrder.add("rollback") + rollbackCallCount++ + mockRollback() + + Mono.empty() + } as Publisher + + override fun releaseSavepoint(p0: String): Publisher = Mono.defer { + callOrder.add("releaseSavepoint") + releaseSavepointCallCount++ + + Mono.empty() + } as Publisher + + override fun getTransactionIsolationLevel(): IsolationLevel = mockTransactionIsolation +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedTransactionManagerTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedTransactionManagerTest.kt new file mode 100644 index 0000000000..3e524f5ffa --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedTransactionManagerTest.kt @@ -0,0 +1,274 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import kotlinx.coroutines.flow.single +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.insert +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.RepeatedTest +import org.springframework.test.annotation.Commit +import org.springframework.transaction.IllegalTransactionStateException +import org.springframework.transaction.TransactionDefinition +import java.util.* +import kotlin.test.assertFailsWith + +open class ExposedTransactionManagerTest : SpringReactiveTransactionTestBase() { + + object T1 : Table() { + val c1 = varchar("c1", Int.MIN_VALUE.toString().length) + } + + private suspend fun T1.insertRandom() { + insert { + it[c1] = Random().nextInt().toString() + } + } + + @BeforeEach + fun beforeTest() = runTest { + transactionManager.execute { + SchemaUtils.create(T1) + } + } + + @AfterEach + fun afterTest() = runTest { + transactionManager.execute { + SchemaUtils.drop(T1) + } + } + + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + @Commit + open fun testConnection() = runTestWithMockTransactional { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + } + + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + @Commit + open fun testConnection2() = runTestWithMockTransactional { + val rnd = Random().nextInt().toString() + T1.insert { + it[c1] = rnd + } + assertEquals(rnd, T1.selectAll().single()[T1.c1]) + } + + @RepeatedTest(5) + @Commit + open fun testConnectionCombineWithExposedTransaction() = runTest { + suspendTransaction { + val rnd = Random().nextInt().toString() + T1.insert { + it[c1] = rnd + } + assertEquals(rnd, T1.selectAll().single()[T1.c1]) + + this@ExposedTransactionManagerTest.transactionManager.execute { + T1.insertRandom() + assertEquals(2, T1.selectAll().count()) + } + } + } + + @RepeatedTest(5) + @Commit +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionCombineWithExposedTransaction2() = runTestWithMockTransactional { + val rnd = Random().nextInt().toString() + T1.insert { + it[c1] = rnd + } + assertEquals(rnd, T1.selectAll().single()[T1.c1]) + + suspendTransaction { + T1.insertRandom() + assertEquals(2, T1.selectAll().count()) + } + } + + /** + * Test for Propagation.NESTED + * Execute within a nested transaction if a current transaction exists, behave like REQUIRED otherwise. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithNestedTransactionCommit() = runTestWithMockTransactional { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { + T1.insertRandom() + assertEquals(2, T1.selectAll().count()) + } + assertEquals(2, T1.selectAll().count()) + } + + /** + * Test for Propagation.NESTED with inner roll-back + * The nested transaction will be roll-back only inner transaction when the transaction marks as rollback. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithNestedTransactionInnerRollback() = runTestWithMockTransactional { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { status -> + T1.insertRandom() + assertEquals(2, T1.selectAll().count()) + status.setRollbackOnly() + } + assertEquals(1, T1.selectAll().count()) + } + + /** + * Test for Propagation.NESTED with outer roll-back + * The nested transaction will be roll-back entire transaction when the transaction marks as rollback. + */ + @RepeatedTest(5) + fun testConnectionWithNestedTransactionOuterRollback() = runTest { + transactionManager.execute { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + it.setRollbackOnly() + + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { + T1.insertRandom() + assertEquals(2, T1.selectAll().count()) + } + assertEquals(2, T1.selectAll().count()) + } + + transactionManager.execute { + assertEquals(0, T1.selectAll().count()) + } + } + + /** + * Test for Propagation.REQUIRES_NEW + * Create a new transaction, and suspend the current transaction if one exists. + */ + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithRequiresNew() = runTestWithMockTransactional { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + transactionManager.execute(TransactionDefinition.PROPAGATION_REQUIRES_NEW) { + assertEquals(0, T1.selectAll().count()) + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + } + assertEquals(2, T1.selectAll().count()) + } + + /** + * Test for Propagation.REQUIRES_NEW with inner transaction roll-back + * The inner transaction will be roll-back only inner transaction when the transaction marks as rollback. + * And since isolation level is READ_COMMITTED, the inner transaction can't see the changes of outer transaction. + */ + @RepeatedTest(5) + fun testConnectionWithRequiresNewWithInnerTransactionRollback() = runTest { + transactionManager.execute { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + transactionManager.execute(TransactionDefinition.PROPAGATION_REQUIRES_NEW) { + T1.insertRandom() + assertEquals(1, T1.selectAll().count()) + it.setRollbackOnly() + } + assertEquals(1, T1.selectAll().count()) + } + + transactionManager.execute { + assertEquals(1, T1.selectAll().count()) + } + } + + /** + * Test for Propagation.NEVER + * Execute non-transactionally, throw an exception if a transaction exists. + */ + @RepeatedTest(5) +// @Transactional(propagation = Propagation.NEVER) // see [runTestWithMockTransactional] + open fun testPropagationNever() = runTestWithMockTransactional( + propagationBehavior = TransactionDefinition.PROPAGATION_NEVER + ) { + assertFailsWith { // Should Be "No transaction exists" + T1.insertRandom() + } + } + + /** + * Test for Propagation.NEVER + * Throw an exception cause outer transaction exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationNeverWithExistingTransaction() = runTestWithMockTransactional { + assertFailsWith { + T1.insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_NEVER) { + T1.insertRandom() + } + } + } + + /** + * Test for Propagation.MANDATORY + * Support a current transaction, throw an exception if none exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationMandatoryWithTransaction() = runTestWithMockTransactional { + T1.insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_MANDATORY) { + T1.insertRandom() + } + } + + /** + * Test for Propagation.MANDATORY + * Throw an exception cause no transaction exists. + */ + @RepeatedTest(5) + open fun testPropagationMandatoryWithoutTransaction() = runTest { + assertFailsWith { + transactionManager.execute(TransactionDefinition.PROPAGATION_MANDATORY) { + T1.insertRandom() + } + } + } + + /** + * Test for Propagation.SUPPORTS + * Support a current transaction, execute non-transactionally if none exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationSupportWithTransaction() = runTestWithMockTransactional { + T1.insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_SUPPORTS) { + T1.insertRandom() + } + } + + /** + * Test for Propagation.SUPPORTS + * Execute non-transactionally if none exists. + */ + @RepeatedTest(5) + open fun testPropagationSupportWithoutTransaction() = runTest { + transactionManager.execute(TransactionDefinition.PROPAGATION_SUPPORTS) { + assertFailsWith { // Should Be "No transaction exists" + T1.insertRandom() + } + } + } +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/MixedExposedR2dbcTransactionTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/MixedExposedR2dbcTransactionTest.kt new file mode 100644 index 0000000000..ee46377d55 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/MixedExposedR2dbcTransactionTest.kt @@ -0,0 +1,170 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.dao.id.java.UUIDTable +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.insert +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.r2dbc.core.DatabaseClient +import org.springframework.r2dbc.core.awaitRowsUpdated +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class MixedExposedR2dbcTransactionTest : SpringReactiveTransactionTestBase() { + + @Autowired + private lateinit var mixedTransactionService: MixedTransactionService + + @BeforeEach + fun setUp() = runTest { + suspendTransaction { + SchemaUtils.create(CustomerTable) + } + } + + @AfterEach + fun tearDown() = runTest { + suspendTransaction { + SchemaUtils.drop(CustomerTable) + } + } + + @Test + fun testSuccessfulMixedTransaction() = runTest { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(2, customers.size) + } + + @Test + fun testFailedMixedTransaction() = runTest { + assertFailsWith { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = true) + } + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(0, customers.size) + } + + @Test + fun testSuccessfulRequiresNewTransactions() = runTest { + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + } + } + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(4, customers.size) + } + + @Test + fun testFailedRequiresNewTransactions() = runTest { + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + assertFailsWith { + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = true) + } + } + } + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(2, customers.size) + } + + @Test + fun testSuccessfulNestedTransactions() = runTest { + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + mixedTransactionService.withNestedTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + } + } + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(4, customers.size) + } + + @Test + fun testFailedNestedTransactions() = runTest { + mixedTransactionService.withNewTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = false) + assertFailsWith { + mixedTransactionService.withNestedTransaction { + mixedTransactionService.saveTwoThingsSpringTransactional(fail = true) + } + } + } + + val customers = suspendTransaction { CustomerTable.selectAll().toList() } + + assertEquals(2, customers.size) + } +} + +@Service +open class MixedTransactionService { + + @Autowired + private lateinit var connectionFactory: ConnectionFactory + + private val client: DatabaseClient by lazy { DatabaseClient.create(connectionFactory) } + private var nextNameIndex: Int = 0 + + @Transactional + open suspend fun saveTwoThingsSpringTransactional(fail: Boolean) { + saveTwoThings(fail) + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + open suspend fun withNewTransaction(block: suspend () -> T): T { + return block() + } + + @Transactional(propagation = Propagation.NESTED) + open suspend fun withNestedTransaction(block: suspend () -> T): T { + return block() + } + + private suspend fun saveTwoThings(fail: Boolean) { + CustomerTable.insert { + it[id] = UUID.randomUUID() + it[name] = "Test${nextNameIndex++}" + } + client + .sql("INSERT INTO customer VALUES (:id, :name)") + .bind("id", UUID.randomUUID()) + .bind("name", "Test${nextNameIndex++}") + .fetch() + .awaitRowsUpdated() + + @Suppress("UseCheckOrError") + if (fail) { + throw IllegalStateException("Fail") + } + } +} + +// originally should be in SpringTransactionEntityTest (but this does not exist for R2DBC) +object CustomerTable : UUIDTable(name = "customer") { + val name = varchar(name = "name", length = 255).uniqueIndex() +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/R2dbcExposedTransactionManagerTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/R2dbcExposedTransactionManagerTest.kt new file mode 100644 index 0000000000..6ace42d898 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/R2dbcExposedTransactionManagerTest.kt @@ -0,0 +1,286 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.RepeatedTest +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.r2dbc.core.DatabaseClient +import org.springframework.r2dbc.core.awaitRowsUpdated +import org.springframework.r2dbc.core.awaitSingle +import org.springframework.test.annotation.Commit +import org.springframework.transaction.IllegalTransactionStateException +import org.springframework.transaction.TransactionDefinition +import java.util.* +import kotlin.test.assertFailsWith + +/** + * Similar to [ExposedTransactionManagerTest] but working with Spring R2DBC + * constructs like [DatabaseClient] instead of Exposed APIs to verify that it + * works like expected. + */ +open class R2dbcExposedTransactionManagerTest : SpringReactiveTransactionTestBase() { + object T1 : Table() { + val c1 = varchar("c1", Int.MIN_VALUE.toString().length) + } + + @Autowired + private lateinit var connectionFactory: ConnectionFactory + + private val r2dbc: DatabaseClient by lazy { DatabaseClient.create(connectionFactory) } + + private suspend fun insertRandom() { + r2dbc.sql( + "INSERT INTO ${T1.tableName} VALUES (:value)" + ).bind( + "value", Random().nextInt().toString() + ).fetch().awaitRowsUpdated() + } + + private suspend fun insert(value: String) { + r2dbc.sql( + "INSERT INTO ${T1.tableName} VALUES (:value)" + ).bind( + "value", value + ).fetch().awaitRowsUpdated() + } + + private suspend fun getCount(): Int = r2dbc + .sql("SELECT count(*) FROM ${T1.tableName}") + .map { row, _ -> + row.get(0, java.lang.Long::class.java)?.toInt() ?: 0 + } + .awaitSingle() + + private suspend fun getSingleValue(): String = r2dbc + .sql("SELECT * FROM ${T1.tableName}") + .map { row, _ -> row.get("c1", String::class.java) ?: "" } + .awaitSingle() + + @BeforeEach + fun beforeTest() = runTest { + transactionManager.execute { + SchemaUtils.create(T1) + } + } + + @AfterEach + fun afterTest() = runTest { + transactionManager.execute { + SchemaUtils.drop(T1) + } + } + + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + @Commit + open fun testConnection() = runTestWithMockTransactional { + insertRandom() + assertEquals(1, getCount()) + } + + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + @Commit + open fun testConnection2() = runTestWithMockTransactional { + val rnd = Random().nextInt().toString() + insert(rnd) + assertEquals(rnd, getSingleValue()) + } + + @RepeatedTest(5) + @Commit + open fun testConnectionCombineWithExposedTransaction() = runTest { + suspendTransaction { + val rnd = Random().nextInt().toString() + insert(rnd) + assertEquals(rnd, getSingleValue()) + + this@R2dbcExposedTransactionManagerTest.transactionManager.execute { + insertRandom() + assertEquals(2, getCount()) + } + } + } + + @RepeatedTest(5) + @Commit +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionCombineWithExposedTransaction2() = runTestWithMockTransactional { + val rnd = Random().nextInt().toString() + insert(rnd) + assertEquals(rnd, getSingleValue()) + + suspendTransaction { + insertRandom() + assertEquals(2, getCount()) + } + } + + /** + * Test for Propagation.NESTED + * Execute within a nested transaction if a current transaction exists, behave like REQUIRED otherwise. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithNestedTransactionCommit() = runTestWithMockTransactional { + insertRandom() + assertEquals(1, getCount()) + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { + insertRandom() + assertEquals(2, getCount()) + } + assertEquals(2, getCount()) + } + + /** + * Test for Propagation.NESTED with inner roll-back + * The nested transaction will be roll-back only inner transaction when the transaction marks as rollback. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithNestedTransactionInnerRollback() = runTestWithMockTransactional { + insertRandom() + assertEquals(1, getCount()) + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { status -> + insertRandom() + assertEquals(2, getCount()) + status.setRollbackOnly() + } + assertEquals(1, getCount()) + } + + /** + * Test for Propagation.NESTED with outer roll-back + * The nested transaction will be roll-back entire transaction when the transaction marks as rollback. + */ + @RepeatedTest(5) + fun testConnectionWithNestedTransactionOuterRollback() = runTest { + transactionManager.execute { + insertRandom() + assertEquals(1, getCount()) + it.setRollbackOnly() + + transactionManager.execute(TransactionDefinition.PROPAGATION_NESTED) { + insertRandom() + assertEquals(2, getCount()) + } + assertEquals(2, getCount()) + } + + transactionManager.execute { + assertEquals(0, getCount()) + } + } + + /** + * Test for Propagation.REQUIRES_NEW + * Create a new transaction, and suspend the current transaction if one exists. + */ + @RepeatedTest(5) + // @Transactional // see [runTestWithMockTransactional] + open fun testConnectionWithRequiresNew() = runTestWithMockTransactional { + insertRandom() + assertEquals(1, getCount()) + transactionManager.execute(TransactionDefinition.PROPAGATION_REQUIRES_NEW) { + assertEquals(0, getCount()) + insertRandom() + assertEquals(1, getCount()) + } + assertEquals(2, getCount()) + } + + /** + * Test for Propagation.REQUIRES_NEW with inner transaction roll-back + * The inner transaction will be roll-back only inner transaction when the transaction marks as rollback. + * And since isolation level is READ_COMMITTED, the inner transaction can't see the changes of outer transaction. + */ + @RepeatedTest(5) + fun testConnectionWithRequiresNewWithInnerTransactionRollback() = runTest { + transactionManager.execute { + insertRandom() + assertEquals(1, getCount()) + transactionManager.execute(TransactionDefinition.PROPAGATION_REQUIRES_NEW) { + insertRandom() + assertEquals(1, getCount()) + it.setRollbackOnly() + } + assertEquals(1, getCount()) + } + + transactionManager.execute { + assertEquals(1, getCount()) + } + } + + /** + * Test for Propagation.NEVER + * Throw an exception cause outer transaction exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationNeverWithExistingTransaction() = runTestWithMockTransactional { + assertFailsWith { + insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_NEVER) { + insertRandom() + } + } + } + + /** + * Test for Propagation.MANDATORY + * Support a current transaction, throw an exception if none exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationMandatoryWithTransaction() = runTestWithMockTransactional { + insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_MANDATORY) { + insertRandom() + } + } + + /** + * Test for Propagation.MANDATORY + * Throw an exception cause no transaction exists. + */ + @RepeatedTest(5) + open fun testPropagationMandatoryWithoutTransaction() = runTest { + assertFailsWith { + transactionManager.execute(TransactionDefinition.PROPAGATION_MANDATORY) { + insertRandom() + } + } + } + + /** + * Test for Propagation.SUPPORTS + * Support a current transaction, execute non-transactionally if none exists. + */ + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + open fun testPropagationSupportWithTransaction() = runTestWithMockTransactional { + insertRandom() + transactionManager.execute(TransactionDefinition.PROPAGATION_SUPPORTS) { + insertRandom() + } + } + + /** + * Test for Propagation.SUPPORTS + * Execute non-transactionally if none exists. + */ + @RepeatedTest(5) + open fun testPropagationSupportWithoutTransaction() = runTest { + transactionManager.execute(TransactionDefinition.PROPAGATION_SUPPORTS) { + insertRandom() + } + assertEquals(1, getCount()) + } +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringCoroutineTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringCoroutineTest.kt new file mode 100644 index 0000000000..c7e94b7b0e --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringCoroutineTest.kt @@ -0,0 +1,52 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import kotlinx.coroutines.* +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.insert +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction +import org.junit.jupiter.api.RepeatedTest +import org.springframework.test.annotation.Commit +import kotlin.test.assertEquals + +open class SpringCoroutineTest : SpringReactiveTransactionTestBase() { + object Testing : Table("COROUTINE_TESTING") { + val id = integer("id").autoIncrement() + + override val primaryKey = PrimaryKey(id) + } + + @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) + @RepeatedTest(5) +// @Transactional // see [runTestWithMockTransactional] + @Commit + open fun testNestedCoroutineTransaction() = runTestWithMockTransactional { + try { + SchemaUtils.create(Testing) + + val mainJob = GlobalScope.async { + // @CoroutinesTimeout is not compatible with @Transactional + val results = withTimeout(1000) { + (1..5).map { indx -> + async(Dispatchers.IO) { + suspendTransaction { + Testing.insert { } + indx + } + } + }.awaitAll() + } + + assertEquals(15, results.sum()) + } + + while (!mainJob.isCompleted) Thread.sleep(100) + mainJob.getCompletionExceptionOrNull()?.let { throw it } + + assertEquals(5L, Testing.selectAll().count()) + } finally { + SchemaUtils.drop(Testing) + } + } +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringMultiContainerTransactionTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringMultiContainerTransactionTest.kt new file mode 100644 index 0000000000..51b41636be --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringMultiContainerTransactionTest.kt @@ -0,0 +1,236 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.ResultRow +import org.jetbrains.exposed.v1.core.dao.id.LongIdTable +import org.jetbrains.exposed.v1.core.vendors.H2Dialect +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.deleteAll +import org.jetbrains.exposed.v1.r2dbc.insertAndGetId +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.transaction.annotation.EnableTransactionManagement +import org.springframework.transaction.annotation.Transactional + +open class SpringMultiContainerTransactionTest { + + val orderContainer = AnnotationConfigApplicationContext(OrderConfig::class.java) + val paymentContainer = AnnotationConfigApplicationContext(PaymentConfig::class.java) + + val orders: Orders = orderContainer.getBean(Orders::class.java) + val payments: Payments = paymentContainer.getBean(Payments::class.java) + + @BeforeEach + open fun beforeTest() = runTest { + orders.init() + payments.init() + } + + @Test + open fun test1() = runTest { + Assertions.assertEquals(0, orders.findAll().size) + Assertions.assertEquals(0, payments.findAll().size) + } + + @Test + open fun test2() = runTest { + orders.create() + Assertions.assertEquals(1, orders.findAll().size) + payments.create() + Assertions.assertEquals(1, payments.findAll().size) + } + + @Test + open fun test3() = runTest { + orders.suspendTransaction { + payments.create() + orders.create() + payments.create() + } + Assertions.assertEquals(1, orders.findAll().size) + Assertions.assertEquals(2, payments.findAll().size) + } + + @Test + open fun test4() = runTest { + kotlin.runCatching { + orders.suspendTransaction { + orders.create() + payments.create() + throw SpringTransactionTestException() + } + } + Assertions.assertEquals(0, orders.findAll().size) + Assertions.assertEquals(1, payments.findAll().size) + } + + @Test + open fun test5() = runTest { + kotlin.runCatching { + orders.suspendTransaction { + orders.create() + payments.databaseTemplate { + payments.create() + throw SpringTransactionTestException() + } + } + } + Assertions.assertEquals(0, orders.findAll().size) + Assertions.assertEquals(0, payments.findAll().size) + } + + @Test + open fun test6() = runTest { + Assertions.assertEquals(0, orders.findAllWithExposedTrxBlock().size) + Assertions.assertEquals(0, payments.findAllWithExposedTrxBlock().size) + } + + @Test + open fun test7() = runTest { + orders.createWithExposedTrxBlock() + Assertions.assertEquals(1, orders.findAllWithExposedTrxBlock().size) + payments.createWithExposedTrxBlock() + Assertions.assertEquals(1, payments.findAllWithExposedTrxBlock().size) + } + + @Test + open fun test8() = runTest { + orders.suspendTransaction { + payments.createWithExposedTrxBlock() + orders.createWithExposedTrxBlock() + payments.createWithExposedTrxBlock() + } + Assertions.assertEquals(1, orders.findAllWithExposedTrxBlock().size) + Assertions.assertEquals(2, payments.findAllWithExposedTrxBlock().size) + } + + @Test + open fun test9() = runTest { + kotlin.runCatching { + orders.suspendTransaction { + orders.createWithExposedTrxBlock() + payments.createWithExposedTrxBlock() + throw SpringTransactionTestException() + } + } + Assertions.assertEquals(0, orders.findAllWithExposedTrxBlock().size) + Assertions.assertEquals(1, payments.findAllWithExposedTrxBlock().size) + } + + @Test + open fun test10() = runTest { + kotlin.runCatching { + orders.suspendTransaction { + orders.createWithExposedTrxBlock() + payments.databaseTemplate { + payments.createWithExposedTrxBlock() + throw SpringTransactionTestException() + } + } + } + Assertions.assertEquals(0, orders.findAllWithExposedTrxBlock().size) + Assertions.assertEquals(0, payments.findAllWithExposedTrxBlock().size) + } +} + +@Configuration +@EnableTransactionManagement(proxyTargetClass = true) +open class OrderConfig { + + @Bean + open fun cxFactory(): ConnectionFactory = ConnectionFactories.get("r2dbc:h2:mem:///embeddedTest1;DB_CLOSE_DELAY=-1;") + + @Bean + open fun transactionManager(connectionFactory: ConnectionFactory) = SpringReactiveTransactionManager( + connectionFactory, + R2dbcDatabaseConfig { explicitDialect = H2Dialect() } + ) + + @Bean + open fun orders() = Orders() +} + +@Transactional +open class Orders { + + open suspend fun findAll(): List = Order.selectAll().toList() + + // NOTE: qualifier names must be left in + open suspend fun findAllWithExposedTrxBlock() = org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction { findAll() } + + open suspend fun create() = Order.insertAndGetId { + it[buyer] = 123 + }.value + + // NOTE: qualifier names must be left in + open suspend fun createWithExposedTrxBlock() = org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction { create() } + + open suspend fun init() { + SchemaUtils.create(Order) + Order.deleteAll() + } + + open suspend fun suspendTransaction(block: suspend () -> Unit) { + block() + } +} + +object Order : LongIdTable("orders") { + val buyer = long("buyer_id") +} + +@Configuration +@EnableTransactionManagement(proxyTargetClass = true) +open class PaymentConfig { + + @Bean + open fun cxFactory(): ConnectionFactory = ConnectionFactories.get("r2dbc:h2:mem:///embeddedTest2;DB_CLOSE_DELAY=-1;") + + @Bean + open fun transactionManager(connectionFactory: ConnectionFactory) = SpringReactiveTransactionManager( + connectionFactory, + R2dbcDatabaseConfig { explicitDialect = H2Dialect() } + ) + + @Bean + open fun payments() = Payments() +} + +@Transactional +open class Payments { + + open suspend fun findAll(): List = Payment.selectAll().toList() + + open suspend fun findAllWithExposedTrxBlock() = suspendTransaction { findAll() } + + open suspend fun create() = Payment.insertAndGetId { + it[state] = "state" + }.value + + open suspend fun createWithExposedTrxBlock() = suspendTransaction { create() } + + open suspend fun init() { + SchemaUtils.create(Payment) + Payment.deleteAll() + } + + open suspend fun databaseTemplate(block: suspend () -> Unit) { + block() + } +} + +object Payment : LongIdTable("payments") { + val state = varchar("state", 50) +} + +private class SpringTransactionTestException : Error() diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManagerTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManagerTest.kt new file mode 100644 index 0000000000..2d661bb4de --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManagerTest.kt @@ -0,0 +1,409 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcRollbackException +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.vendors.H2Dialect +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.r2dbc.UncategorizedR2dbcException +import org.springframework.r2dbc.connection.TransactionAwareConnectionFactoryProxy +import org.springframework.transaction.IllegalTransactionStateException +import org.springframework.transaction.ReactiveTransaction +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.reactive.TransactionalOperator +import org.springframework.transaction.reactive.executeAndAwait +import org.springframework.transaction.support.DefaultTransactionDefinition +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class SpringReactiveTransactionManagerTest { + + companion object { + val cf1 = ConnectionFactorySpy(::ConnectionSpy) + lateinit var con1: ConnectionSpy + val cf2 = ConnectionFactorySpy(::ConnectionSpy) + lateinit var con2: ConnectionSpy + + /** + * This test has a teardown that unregisters databases. The intention + * is to only tear down databases registered by this test - but + * an earlier implementation accidentally unregistered the main + * database registered by the [SpringReactiveTransactionTestBase]. + * + * To avoid doing such, we try to only unregister until we hit the + * one that was there before the start of this test. + */ + private var databaseBeforeTestStart: R2dbcDatabase? = null + + @BeforeAll + @JvmStatic + fun init() = runTest { + con1 = cf1.getCon() as ConnectionSpy + con2 = cf2.getCon() as ConnectionSpy + } + } + + @BeforeEach + fun beforeTest() { + databaseBeforeTestStart = TransactionManager.primaryDatabase + con1.clearMock() + con2.clearMock() + } + + @BeforeEach + fun afterTest() { + while (TransactionManager.primaryDatabase != databaseBeforeTestStart) { + TransactionManager.primaryDatabase?.let { TransactionManager.closeAndUnregister(it) } + } + } + + @Test + fun `set manager when transaction start`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert(false) + } + + @Test + fun `set right transaction manager when two transaction manager exist`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert(false) + + val tm2 = getDefaultManager(cf2) + tm2.executeAssert(false) + } + + @Test + fun `set right transaction manager when two transaction manager with nested transaction template`() = runTest { + val tm = getDefaultManager(cf1) + val tm2 = getDefaultManager(cf2) + + tm2.executeAssert(false) { + tm.executeAssert(false) + + assertEquals( + TransactionManager.currentOrNull()?.db?.let { TransactionManager.managerFor(it) }, + TransactionManager.current().transactionManager + ) + } + } + + @Test + fun `connection commit and close when transaction success`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert() + + assertTrue(con1.verifyCallOrder("setAutoCommit", "commit", "close")) + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `connection rollback and close when transaction fail`() = runTest { + val tm = getDefaultManager(cf1) + val ex = RuntimeException("Application exception") + try { + tm.executeAssert { + throw ex + } + } catch (e: Exception) { + assertEquals(e::class.java, ex::class.java) + assertEquals(e.message, ex.message) + } + assertEquals(1, con1.rollbackCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `connection commit and closed when nested transaction success`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + tm.executeAssert() + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `connection commit and closed when two different transaction manager with nested transaction success`() = runTest { + val tm1 = getDefaultManager(cf1) + val tm2 = getDefaultManager(cf2) + + tm1.executeAssert { + tm2.executeAssert() + assertEquals( + TransactionManager.currentOrNull()?.db?.let { TransactionManager.managerFor(it) }, + TransactionManager.current().transactionManager + ) + } + + assertEquals(1, con2.commitCallCount) + assertEquals(1, con2.closeCallCount) + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `connection rollback and closed when two different transaction manager with nested transaction failed`() = runTest { + val tm1 = getDefaultManager(cf1) + val tm2 = getDefaultManager(cf2) + val ex = RuntimeException("Application exception") + try { + tm1.executeAssert { + tm2.executeAssert { + throw ex + } + assertEquals( + TransactionManager.currentOrNull()?.db?.let { TransactionManager.managerFor(it) }, + TransactionManager.current().transactionManager + ) + } + } catch (e: Exception) { + assertEquals(e::class.java, ex::class.java) + assertEquals(e.message, ex.message) + } + + assertEquals(0, con2.commitCallCount) + assertEquals(1, con2.rollbackCallCount) + assertEquals(1, con2.closeCallCount) + assertEquals(0, con1.commitCallCount) + assertEquals(1, con1.rollbackCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `transaction commit with transaction aware connection factory proxy`() = runTest { + val transactionAwareCf = TransactionAwareConnectionFactoryProxy(cf1) + val tm = getDefaultManager(transactionAwareCf) + tm.executeAssert() + + assertTrue(con1.verifyCallOrder("setAutoCommit", "commit")) + assertEquals(1, con1.commitCallCount) + assertTrue(con1.closeCallCount > 0) + } + + @Test + fun `transaction rollback with transaction aware connection factory proxy`() = runTest { + val transactionAwareCf = TransactionAwareConnectionFactoryProxy(cf1) + val tm = getDefaultManager(transactionAwareCf) + val ex = RuntimeException("Application exception") + try { + tm.executeAssert { + throw ex + } + } catch (e: Exception) { + assertEquals(e::class.java, ex::class.java) + assertEquals(e.message, ex.message) + } + + assertTrue(con1.verifyCallOrder("setAutoCommit", "rollback")) + assertEquals(1, con1.rollbackCallCount) + assertTrue(con1.closeCallCount > 0) + } + + @Test + fun `transaction with exception on rollback`() = runTest { + con1.mockRollback = { throw R2dbcRollbackException("Rollback failure") } + + val tm = getDefaultManager(cf1) + assertFailsWith { + tm.executeAssert { + assertEquals(false, it.isRollbackOnly) + it.setRollbackOnly() + assertEquals(true, it.isRollbackOnly) + } + } + + assertTrue(con1.verifyCallOrder("setAutoCommit", "rollback", "close")) + assertEquals(1, con1.rollbackCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `nested transaction with commit`() = runTest { + val tm = getDefaultManager(cf1, R2dbcDatabaseConfig { useNestedTransactions = true }) + + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_NESTED) { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_NESTED) + assertTrue(it.isNewTransaction) + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `nested transaction with rollback`() = runTest { + val tm = getDefaultManager(cf1, R2dbcDatabaseConfig { useNestedTransactions = true }) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_NESTED) { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_NESTED) { status -> + status.setRollbackOnly() + } + assertTrue(it.isNewTransaction) + } + + assertEquals(1, con1.rollbackCallCount) + assertEquals(1, con1.releaseSavepointCallCount) + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `requires new with commit`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW) { status -> + assertTrue(status.isNewTransaction) + } + assertTrue(it.isNewTransaction) + } + + assertEquals(2, con1.commitCallCount) + assertEquals(2, con1.closeCallCount) + } + + @Test + fun `requires new with inner rollback`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW) { status -> + assertTrue(status.isNewTransaction) + status.setRollbackOnly() + } + assertTrue(it.isNewTransaction) + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.rollbackCallCount) + assertEquals(2, con1.closeCallCount) + } + + @Test + fun `not support with required transaction`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + assertTrue(it.isNewTransaction) + tm.executeAssert( + initializeConnection = false, + propagationBehavior = TransactionDefinition.PROPAGATION_NOT_SUPPORTED + ) { + assertFailsWith { + TransactionManager.current().connection() + } + } + assertTrue(it.isNewTransaction) + TransactionManager.current().connection() + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `mandatory with transaction`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_MANDATORY) + assertTrue(it.isNewTransaction) + TransactionManager.current().connection() + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `mandatory without transaction`() = runTest { + val tm = getDefaultManager(cf1) + assertFailsWith { + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_MANDATORY) + } + } + + @Test + fun `support with transaction`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert { + assertTrue(it.isNewTransaction) + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_SUPPORTS) + assertTrue(it.isNewTransaction) + TransactionManager.current().connection() + } + + assertEquals(1, con1.commitCallCount) + assertEquals(1, con1.closeCallCount) + } + + @Test + fun `support without transaction`() = runTest { + val tm = getDefaultManager(cf1) + assertFailsWith { + tm.executeAssert(propagationBehavior = TransactionDefinition.PROPAGATION_SUPPORTS) + } + tm.executeAssert(initializeConnection = false, propagationBehavior = TransactionDefinition.PROPAGATION_SUPPORTS) + assertEquals(0, con1.commitCallCount) + assertEquals(0, con1.rollbackCallCount) + assertEquals(0, con1.closeCallCount) + } + + @Test + fun `transaction timeout`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert(initializeConnection = true, timeout = 1) { + assertEquals(1, TransactionManager.current().queryTimeout) + } + } + + @Test + fun `transaction timeout propagation`() = runTest { + val tm = getDefaultManager(cf1) + tm.executeAssert(initializeConnection = true, timeout = 1) { + tm.executeAssert(initializeConnection = true, timeout = 2) { + assertEquals(1, TransactionManager.current().queryTimeout) + } + assertEquals(1, TransactionManager.current().queryTimeout) + } + } + + private fun getDefaultManager( + connectionFactory: ConnectionFactory, + databaseConfig: R2dbcDatabaseConfig.Builder = R2dbcDatabaseConfig.Builder() + ): SpringReactiveTransactionManager = SpringReactiveTransactionManager( + connectionFactory = connectionFactory, + databaseConfig = databaseConfig.apply { explicitDialect = H2Dialect() } + ) + + private suspend fun SpringReactiveTransactionManager.executeAssert( + initializeConnection: Boolean = true, + propagationBehavior: Int = TransactionDefinition.PROPAGATION_REQUIRED, + timeout: Int? = null, + body: suspend (ReactiveTransaction) -> Unit = {} + ) { + val trxDef = DefaultTransactionDefinition(propagationBehavior).apply { + if (timeout != null) this.timeout = timeout + } + val trxOp = TransactionalOperator.create(this, trxDef) + trxOp.executeAndAwait { + TransactionManager.currentOrNull()?.db?.let { db -> + assertEquals( + TransactionManager.managerFor(db), + TransactionManager.current().transactionManager + ) + } + + if (initializeConnection) TransactionManager.current().connection() + body(it) + } + } +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionTestBase.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionTestBase.kt new file mode 100644 index 0000000000..455348fad9 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionTestBase.kt @@ -0,0 +1,107 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.vendors.H2Dialect +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.junit.jupiter.api.MethodOrderer +import org.junit.jupiter.api.TestMethodOrder +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.ApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.springframework.transaction.ReactiveTransaction +import org.springframework.transaction.ReactiveTransactionManager +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.annotation.EnableTransactionManagement +import org.springframework.transaction.annotation.TransactionManagementConfigurer +import org.springframework.transaction.reactive.TransactionalOperator +import org.springframework.transaction.reactive.executeAndAwait +import org.springframework.transaction.support.DefaultTransactionDefinition + +@Configuration +@EnableTransactionManagement +/*(mode = AdviceMode.ASPECTJ, proxyTargetClass = true)*/ +open class TestConfig : TransactionManagementConfigurer { + + @Bean + open fun cxFactory(): ConnectionFactory = ConnectionFactories.get("r2dbc:h2:mem:///embeddedTest;DB_CLOSE_DELAY=-1;") + + @Bean + override fun annotationDrivenTransactionManager(): ReactiveTransactionManager = SpringReactiveTransactionManager( + cxFactory(), + R2dbcDatabaseConfig { + useNestedTransactions = true + explicitDialect = H2Dialect() + } + ) + + @Bean + open fun mixedTransactionService(): MixedTransactionService = MixedTransactionService() +} + +@ExtendWith(SpringExtension::class) +@ContextConfiguration(classes = [TestConfig::class]) +@TestMethodOrder(MethodOrderer.MethodName::class) +@Suppress("UnnecessaryAbstractClass") +abstract class SpringReactiveTransactionTestBase { + + @Autowired + lateinit var ctx: ApplicationContext + + @Autowired + lateinit var transactionManager: ReactiveTransactionManager + + /** + * Invokes [runTest] with the [testBody] executed by a [TransactionalOperator] that is set up to follow the same + * rollback rules as `@Transactional`. + * + * Currently, `@Transactional` in Spring's `TestContext` is only configured to find a `PlatformTransactionManager`, + * so it is completely unusable for Spring-R2dbc unit tests. + * + * [Open Issue](https://github.com/spring-projects/spring-framework/issues/24226) + */ + fun runTestWithMockTransactional( + propagationBehavior: Int = TransactionDefinition.PROPAGATION_REQUIRED, + isolationLevel: Int = TransactionDefinition.ISOLATION_DEFAULT, + testBody: suspend TestScope.(ReactiveTransaction) -> Unit + ) { + if (transactionManager !is SpringReactiveTransactionManager) error("Wrong txManager instance: ${this.javaClass.name}") + + val trxDef = DefaultTransactionDefinition(propagationBehavior).apply { + this.isolationLevel = isolationLevel + } + + runTest { + val trxOp = TransactionalOperator.create(transactionManager, trxDef) + trxOp.executeAndAwait { + testBody(it) + it.setRollbackOnly() + } + } + } +} + +suspend fun ReactiveTransactionManager.execute( + propagationBehavior: Int = TransactionDefinition.PROPAGATION_REQUIRED, + isolationLevel: Int = TransactionDefinition.ISOLATION_DEFAULT, + readOnly: Boolean = false, + timeout: Int? = null, + block: suspend (ReactiveTransaction) -> Unit +) { + if (this !is SpringReactiveTransactionManager) error("Wrong txManager instance: ${this.javaClass.name}") + val trxDef = DefaultTransactionDefinition(propagationBehavior).apply { + this.isolationLevel = isolationLevel + if (readOnly) this.isReadOnly = true + if (timeout != null) this.timeout = timeout + } + val trxOp = TransactionalOperator.create(this, trxDef) + trxOp.executeAndAwait { + block(it) + } +} diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionRollbackTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionRollbackTest.kt new file mode 100644 index 0000000000..1d2d5ef6de --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionRollbackTest.kt @@ -0,0 +1,375 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.dao.id.EntityID +import org.jetbrains.exposed.v1.core.dao.id.LongIdTable +import org.jetbrains.exposed.v1.core.vendors.H2Dialect +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.deleteAll +import org.jetbrains.exposed.v1.r2dbc.insert +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertNotNull +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.transaction.IllegalTransactionStateException +import org.springframework.transaction.UnexpectedRollbackException +import org.springframework.transaction.annotation.EnableTransactionManagement +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class SpringTransactionRollbackTest { + + val container = AnnotationConfigApplicationContext(TransactionManagerAttributeSourceTestConfig::class.java) + + @BeforeEach + fun beforeTest() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + testRollback.init() + } + + @AfterEach + fun afterTest() { + container.close() + } + + @Test + fun `test ExposedR2dbcException rollback`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + assertFailsWith { + testRollback.suspendTransaction { + insertOriginTable("1") + insertWrongTable("12345678901234567890") + } + } + + assertEquals(0, testRollback.entireTableSize()) + } + + @Test + fun `test RuntimeException rollback`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + assertFailsWith { + testRollback.suspendTransaction { + insertOriginTable("1") + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() + } + } + + assertEquals(0, testRollback.entireTableSize()) + } + + @Test + fun `test check exception commit`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + assertFailsWith { + testRollback.suspendTransaction { + insertOriginTable("1") + @Suppress("TooGenericExceptionThrown") + throw Exception() + } + } + + assertEquals(1, testRollback.entireTableSize()) + } + + @Test + fun `exception in inner Tx causes rollback of outer Tx`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + assertFailsWith { + testRollback.suspendTransaction { // outer logicTx start + testRollback.insertOriginTable("Tx1") + + try { + testRollback.suspendTransaction { // inner logicTx start + testRollback.insertOriginTable("Tx2") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() + // isGlobalRollbackOnParticipationFailure == true -> doSetRollbackOnly() -> mark as globalRollBack + } + } catch (@Suppress("SwallowedException") _: RuntimeException) { + // Ignore exception + } + } // when outer logicTx commit() -> check globalRollBack mark -> throw UnexpectedRollbackException -> rollback + } + + assertEquals(0, testRollback.entireTableSize()) + } + + @Test + fun `isRollback is managed separately for different transactions`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + assertFailsWith { + testRollback.suspendTransaction { // outer logicTx start + val outerTx = TransactionManager.currentOrNull() + assertNotNull(outerTx) + assertFalse(outerTx.isMarkedRollback()) + + try { + testRollback.suspendTransaction { // inner logicTx start; inner == outer + val innerTx = TransactionManager.currentOrNull() + assertNotNull(innerTx) + assertFalse(innerTx.isMarkedRollback()) + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() // mark as globalRollBack + } + } catch (@Suppress("SwallowedException") _: RuntimeException) { + // Ignore exception + } + + assertTrue(outerTx.isMarkedRollback()) + + testRollback.transactionWithRequiresNew { // separate transaction != outer + val newInnerTx = TransactionManager.currentOrNull() + assertNotNull(newInnerTx) + assertFalse(newInnerTx.isMarkedRollback()) + } + + testRollback.suspendTransaction { // inner == outer, so still marked for rollback + val innerTx = TransactionManager.currentOrNull() + assertNotNull(innerTx) + assertTrue(innerTx.isMarkedRollback()) + } + } + } + + testRollback.suspendTransaction { // new outer transaction not affected by previous transaction rollback status + val newTx = TransactionManager.currentOrNull() + assertNotNull(newTx) + assertFalse(newTx.isMarkedRollback()) + } + } + + @Test + fun `requiresNew should rollback innerTx without affecting outerTx`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + testRollback.suspendTransaction { // outer logicTx start + testRollback.insertOriginTable("Tx1") + + try { + testRollback.transactionWithRequiresNew { // outer != this + testRollback.insertOriginTable("Tx2") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() + } + } catch (@Suppress("SwallowedException") _: RuntimeException) { + // Ignore exception + } + } + + val entities = testRollback.selectAll() + assertEquals(1, entities.size) + assertEquals("Tx1", entities.first().name) + } + + @Test + fun `supports should participate in existing transaction but not rollback when none exists`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + assertFailsWith { + // Execute without a transaction -> Should not be rolled back + testRollback.transactionWithSupports { + testRollback.insertOriginTable("No Tx") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() // Non-transactional, so it should not be rolled back + } + } + + assertEquals(1, testRollback.entireTableSize()) // Data should remain + + // Execute within a transaction -> Should be rolled back + assertFailsWith { + testRollback.suspendTransaction { + testRollback.insertOriginTable("With Tx") + + testRollback.transactionWithSupports { // outer == this + testRollback.insertOriginTable("Supports Tx") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() // Should trigger rollback + } + } + } + + val entities = testRollback.selectAll() + assertEquals(1, entities.size) // Only the first case's data should remain + assertEquals("No Tx", entities.first().name) + } + + @Test + fun `notSupported should suspend outer transaction and execute without transaction`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + testRollback.suspendTransaction { + testRollback.insertOriginTable("Tx1") + + try { + testRollback.transactionWithNotSupported { + testRollback.insertOriginTable("No Tx") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() // Since it's non-transactional, it won't be rolled back + } + } catch (@Suppress("SwallowedException") _: RuntimeException) { + // Ignore exception + } + } + + assertEquals(2, testRollback.entireTableSize()) // Both records should remain + } + + @Test + fun `mandatory should fail if no existing transaction but participate if one exists`() = runTest { + val testRollback = container.getBean(TestRollback::class.java) + + assertFailsWith { + testRollback.transactionWithMandatory { + testRollback.insertOriginTable("No Parent Tx") // Will trigger roll back + } + } + + testRollback.suspendTransaction { + testRollback.insertOriginTable("Tx 1") + testRollback.transactionWithMandatory { // outer == this + testRollback.insertOriginTable("Tx 2") + } + } + + assertEquals(2, testRollback.entireTableSize()) // Both records should remain + + assertFailsWith { + testRollback.suspendTransaction { + testRollback.insertOriginTable("Tx11") + + try { + testRollback.transactionWithMandatory { // outer == this + testRollback.insertOriginTable("Tx22") + + @Suppress("TooGenericExceptionThrown") + throw RuntimeException() + } + } catch (@Suppress("SwallowedException") _: RuntimeException) { + // Ignore exception + } + } + } + + val entities = testRollback.selectAll() + assertEquals(2, entities.size) // Only original records should remain + assertTrue { entities.none { it.name.startsWith("No ") || it.name.startsWith("New ") } } + } +} + +@Configuration +@EnableTransactionManagement(proxyTargetClass = true) +open class TransactionManagerAttributeSourceTestConfig { + + @Bean + open fun cxFactory(): ConnectionFactory = ConnectionFactories.get("r2dbc:h2:mem:///embeddedTest1;DB_CLOSE_DELAY=-1;") + + @Bean + open fun transactionManager(connectionFactory: ConnectionFactory) = SpringReactiveTransactionManager( + connectionFactory, + R2dbcDatabaseConfig { explicitDialect = H2Dialect() } + ) + + @Bean + open fun transactionAttributeSource() = ExposedSpringReactiveTransactionAttributeSource() + + @Bean + open fun testRollback() = TestRollback() +} + +@Transactional +open class TestRollback { + + open suspend fun init() { + SchemaUtils.create(RollbackTable) + RollbackTable.deleteAll() + } + + open suspend fun suspendTransaction(block: suspend TestRollback.() -> Unit) { + block() + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + open suspend fun transactionWithRequiresNew(block: suspend TestRollback.() -> Unit) { + block() + } + + @Transactional(propagation = Propagation.NESTED) + open suspend fun transactionWithNested(block: suspend TestRollback.() -> Unit) { + block() + } + + @Transactional(propagation = Propagation.SUPPORTS) + open suspend fun transactionWithSupports(block: suspend TestRollback.() -> Unit) { + block() + } + + @Transactional(propagation = Propagation.NOT_SUPPORTED) + open suspend fun transactionWithNotSupported(block: suspend TestRollback.() -> Unit) { + block() + } + + @Transactional(propagation = Propagation.MANDATORY) + open suspend fun transactionWithMandatory(block: suspend TestRollback.() -> Unit) { + block() + } + + open suspend fun insertOriginTable(name: String) { + RollbackTable.insert { + it[RollbackTable.name] = name + } + } + + open suspend fun insertWrongTable(name: String) { + WrongDefinedRollbackTable.insert { + it[WrongDefinedRollbackTable.name] = name + } + } + + open suspend fun entireTableSize(): Long { + return RollbackTable.selectAll().count() + } + + open suspend fun selectAll(): List { + return RollbackTable.selectAll().map { + RollbackEntity( + id = it[RollbackTable.id], + name = it[RollbackTable.name] + ) + }.toList() + } +} + +object RollbackTable : LongIdTable("test_rollback") { + val name = varchar("name", 5) +} + +object WrongDefinedRollbackTable : LongIdTable("test_rollback") { + val name = varchar("name", 10) +} + +data class RollbackEntity(val id: EntityID, val name: String) diff --git a/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionSingleConnectionTest.kt b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionSingleConnectionTest.kt new file mode 100644 index 0000000000..fdf2e28d57 --- /dev/null +++ b/spring7-reactive-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringTransactionSingleConnectionTest.kt @@ -0,0 +1,109 @@ +package org.jetbrains.exposed.v1.spring7.reactive.transaction + +import io.r2dbc.spi.ConnectionFactory +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.test.runTest +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.core.vendors.H2Dialect +import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.SchemaUtils +import org.jetbrains.exposed.v1.r2dbc.selectAll +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.r2dbc.connection.ConnectionFactoryUtils +import org.springframework.r2dbc.connection.SingleConnectionFactory +import org.springframework.transaction.ReactiveTransactionManager +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.annotation.EnableTransactionManagement +import kotlin.test.assertEquals + +class SpringTransactionSingleConnectionTest { + object T1 : Table() { + val c1 = varchar("c1", Int.MIN_VALUE.toString().length) + } + + val singleConnectionH2TestContainer = AnnotationConfigApplicationContext(SingleConnectionH2TestConfig::class.java) + val transactionManager: ReactiveTransactionManager = singleConnectionH2TestContainer.getBean(ReactiveTransactionManager::class.java) + val connectionFactory: ConnectionFactory = singleConnectionH2TestContainer.getBean(ConnectionFactory::class.java) + + @BeforeEach + fun beforeTest() = runTest { + transactionManager.execute { + SchemaUtils.create(T1) + } + } + + @AfterEach + fun afterTest() = runTest { + transactionManager.execute { + SchemaUtils.drop(T1) + } + singleConnectionH2TestContainer.close() + } + + @Test + fun `start transaction with non default isolation level`() = runTest { + transactionManager.execute( + isolationLevel = TransactionDefinition.ISOLATION_SERIALIZABLE, + ) { + T1.selectAll().toList() + } + } + + @Test + fun `nested transaction with non default isolation level`() = runTest { + transactionManager.execute( + isolationLevel = TransactionDefinition.ISOLATION_SERIALIZABLE, + ) { + T1.selectAll().toList() + + // Nested transaction will inherit isolation level from parent transaction because it uses the same connection + transactionManager.execute( + isolationLevel = TransactionDefinition.ISOLATION_READ_UNCOMMITTED, + ) { + val cx = ConnectionFactoryUtils.getConnection(connectionFactory).awaitFirst() + val actualLevel = cx.transactionIsolationLevel + cx.close().awaitFirstOrNull() + assertEquals( + actualLevel, + TransactionDefinition.ISOLATION_SERIALIZABLE.resolveIsolationLevel() + ) + + T1.selectAll().toList() + } + T1.selectAll().toList() + } + } +} + +@Configuration +@EnableTransactionManagement(proxyTargetClass = true) +open class SingleConnectionH2TestConfig { + + @Bean + open fun singleConnectionH2Factory(): ConnectionFactory { + // args -> SingleConnectionFactory(url, suppressClose) + return SingleConnectionFactory( + "r2dbc:h2:mem:///regular;DB_CLOSE_DELAY=-1;", + true + ) + } + + @Bean + open fun singleConnectionH2TransactionManager( + @Qualifier("singleConnectionH2Factory") connectionFactory: ConnectionFactory + ): ReactiveTransactionManager = SpringReactiveTransactionManager( + connectionFactory, + R2dbcDatabaseConfig { + useNestedTransactions = true + explicitDialect = H2Dialect() + } + ) +} diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/EntityUpdateTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/EntityUpdateTest.kt index 858e195a30..d20b5463f6 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/EntityUpdateTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/EntityUpdateTest.kt @@ -6,12 +6,15 @@ import org.jetbrains.exposed.v1.dao.IntEntity import org.jetbrains.exposed.v1.dao.IntEntityClass import org.jetbrains.exposed.v1.jdbc.SchemaUtils import org.jetbrains.exposed.v1.jdbc.insert +import org.jetbrains.exposed.v1.tests.MISSING_R2DBC_TEST import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.springframework.test.annotation.Commit import org.springframework.transaction.annotation.Transactional import kotlin.test.fail +@Tag(MISSING_R2DBC_TEST) open class EntityUpdateTest : SpringTransactionTestBase() { object T1 : IntIdTable() { diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/ExposedTransactionManagerTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/ExposedTransactionManagerTest.kt index b7fd14575a..bc8b60176e 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/ExposedTransactionManagerTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/ExposedTransactionManagerTest.kt @@ -8,10 +8,12 @@ import org.jetbrains.exposed.v1.jdbc.insert import org.jetbrains.exposed.v1.jdbc.selectAll import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.jetbrains.exposed.v1.tests.NO_R2DBC_SUPPORT import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.RepeatedTest +import org.junit.jupiter.api.Tag import org.springframework.test.annotation.Commit import org.springframework.transaction.IllegalTransactionStateException import org.springframework.transaction.TransactionDefinition @@ -273,6 +275,7 @@ open class ExposedTransactionManagerTest : SpringTransactionTestBase() { /** * Test for Isolation Level */ + @Tag(NO_R2DBC_SUPPORT) // H2_R2DBC used in tests with restricted level support @RepeatedTest(5) @Transactional(isolation = Isolation.READ_COMMITTED) open fun testIsolationLevelReadUncommitted() { @@ -290,6 +293,7 @@ open class ExposedTransactionManagerTest : SpringTransactionTestBase() { * Test for Timeout * Execute with query timeout */ + @Tag(NO_R2DBC_SUPPORT) // H2_R2DBC used in tests with restricted timeout support @RepeatedTest(5) open fun testTimeout() { transactionManager.execute(timeout = 1) { diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/JdbcExposedTransactionManagerTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/JdbcExposedTransactionManagerTest.kt index e262a28dde..5e99b4025d 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/JdbcExposedTransactionManagerTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/JdbcExposedTransactionManagerTest.kt @@ -4,10 +4,12 @@ import org.jetbrains.exposed.v1.core.Table import org.jetbrains.exposed.v1.jdbc.SchemaUtils import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.jetbrains.exposed.v1.tests.NO_R2DBC_SUPPORT import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.RepeatedTest +import org.junit.jupiter.api.Tag import org.springframework.beans.factory.annotation.Autowired import org.springframework.jdbc.core.simple.JdbcClient import org.springframework.test.annotation.Commit @@ -277,6 +279,7 @@ open class JdbcExposedTransactionManagerTest : SpringTransactionTestBase() { /** * Test for Isolation Level */ + @Tag(NO_R2DBC_SUPPORT) // H2_R2DBC used in tests with restricted level support @RepeatedTest(5) @Transactional(isolation = Isolation.READ_COMMITTED) open fun testIsolationLevelReadUncommitted() { diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionEntityTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionEntityTest.kt index b17949cadc..71b8156de6 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionEntityTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionEntityTest.kt @@ -6,8 +6,10 @@ import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.dao.java.UUIDEntity import org.jetbrains.exposed.v1.dao.java.UUIDEntityClass import org.jetbrains.exposed.v1.jdbc.SchemaUtils +import org.jetbrains.exposed.v1.tests.MISSING_R2DBC_TEST import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.Commit @@ -76,6 +78,7 @@ open class Service { } } +@Tag(MISSING_R2DBC_TEST) open class SpringTransactionEntityTest : SpringTransactionTestBase() { @Autowired diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionManagerTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionManagerTest.kt index 8442305cd4..3dc130c627 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionManagerTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionManagerTest.kt @@ -3,9 +3,12 @@ package org.jetbrains.exposed.v1.spring7.transaction import org.jetbrains.exposed.v1.core.DatabaseConfig import org.jetbrains.exposed.v1.jdbc.Database import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager +import org.jetbrains.exposed.v1.tests.NOT_APPLICABLE_TO_R2DBC +import org.jetbrains.exposed.v1.tests.NO_R2DBC_SUPPORT import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy @@ -162,6 +165,10 @@ class SpringTransactionManagerTest { assertEquals(1, con1.closeCallCount) } + // LazyConnectionDataSourceProxy has no R2DBC equivalent + // https://github.com/spring-projects/spring-framework/issues/33897 + // https://github.com/spring-projects/spring-data-relational/issues/2026 + @Tag(NO_R2DBC_SUPPORT) @Test fun `transaction commit with lazy connection data source proxy`() { val lazyDs = LazyConnectionDataSourceProxy(ds1) @@ -171,6 +178,10 @@ class SpringTransactionManagerTest { assertEquals(1, con1.closeCallCount) } + // LazyConnectionDataSourceProxy has no R2DBC equivalent + // https://github.com/spring-projects/spring-framework/issues/33897 + // https://github.com/spring-projects/spring-data-relational/issues/2026 + @Tag(NO_R2DBC_SUPPORT) @Test fun `transaction rollback with lazy connection data source proxy`() { val lazyDs = LazyConnectionDataSourceProxy(ds1) @@ -215,6 +226,9 @@ class SpringTransactionManagerTest { assertTrue(con1.closeCallCount > 0) } + // Rollback following commit failure was purposefully removed from Spring R2DBC + // https://github.com/spring-projects/spring-framework/pull/27572 + @Tag(NOT_APPLICABLE_TO_R2DBC) @Test fun `transaction exception on commit and rollback on commit failure`() { con1.mockCommit = { throw SQLException("Commit failure") } diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionRollbackTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionRollbackTest.kt index 47fbd13ab5..9bf3d025a8 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionRollbackTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/SpringTransactionRollbackTest.kt @@ -8,8 +8,10 @@ import org.jetbrains.exposed.v1.jdbc.deleteAll import org.jetbrains.exposed.v1.jdbc.insert import org.jetbrains.exposed.v1.jdbc.selectAll import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager +import org.jetbrains.exposed.v1.tests.MISSING_R2DBC_TEST import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertNotNull import org.springframework.context.annotation.AnnotationConfigApplicationContext @@ -273,6 +275,11 @@ class SpringTransactionRollbackTest { assertTrue { entities.none { it.name.startsWith("No ") || it.name.startsWith("New ") } } } + // Left out because rollback involving Spring (partial) NESTED is not actually fully supported by Exposed; + // this test would fail with "Transaction manager does not allow nested transactions by default" if catch block removed, + // not the artificial RuntimeException as expected + // https://youtrack.jetbrains.com/issue/EXPOSED-996/Reassess-support-for-Spring-PROPAGATIONNESTED + @Tag(MISSING_R2DBC_TEST) @Test fun `nested should rollback innerTx without affecting outerTx`() { val testRollback = container.getBean(TestRollback::class.java) diff --git a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/TransactionSynchronizationTest.kt b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/TransactionSynchronizationTest.kt index f8eb22c27f..914eaa13f8 100644 --- a/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/TransactionSynchronizationTest.kt +++ b/spring7-transaction/src/test/kotlin/org/jetbrains/exposed/v1/spring7/transaction/TransactionSynchronizationTest.kt @@ -1,12 +1,16 @@ package org.jetbrains.exposed.v1.spring7.transaction +import org.jetbrains.exposed.v1.tests.NOT_APPLICABLE_TO_R2DBC import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.springframework.transaction.TransactionDefinition import org.springframework.transaction.support.TransactionSynchronization import org.springframework.transaction.support.TransactionSynchronizationManager +// Test interface use is only supported by AbstractPlatformTransactionManager +@Tag(NOT_APPLICABLE_TO_R2DBC) class TransactionSynchronizationTest : SpringTransactionTestBase() { private var synchronization: TestSynchronization = TestSynchronization()