Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Kotlin version 2.1.+
The following module(s) require JDK 17 or newer:
* `spring-transaction` - depends on Spring Framework 6
* `spring7-transaction` - depends on Spring Framework 7
* `spring7-reactive-transaction` - depends on Spring Framework 7
* `exposed-spring-boot-starter` - depends on Spring Boot 3
* `exposed-spring-boot4-starter` - depends on Spring Boot 4
* `exposed-crypt` - depends on Spring Security 7
Expand Down
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
dokka(projects.exposed.exposedSpringBoot4Starter)
dokka(projects.exposed.springTransaction)
dokka(projects.exposed.spring7Transaction)
dokka(projects.exposed.spring7ReactiveTransaction)

// Kover aggregated coverage dependencies
// Include all source modules for coverage aggregation
Expand All @@ -47,6 +48,7 @@ dependencies {
kover(project(":exposed-java-time"))
kover(project(":spring-transaction"))
kover(project(":spring7-transaction"))
kover(project(":spring7-reactive-transaction"))
kover(project(":exposed-spring-boot-starter"))
kover(project(":exposed-spring-boot4-starter"))
kover(project(":exposed-jdbc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private fun Project.createDbTestTaskByDialect(db: TestDb, taskName: String, dial
if (db.ignoresSpringTests(dialect)) {
filter {
// exclude all test classes in Spring modules:
// spring-transaction, spring7-transaction,
// spring-transaction, spring7-transaction, spring7-reactive-transaction
// exposed-spring-boot-starter, exposed-spring-boot4-starter
exclude(
"org/jetbrains/exposed/v1/spring/*",
Expand Down
6 changes: 6 additions & 0 deletions exposed-r2dbc/api/exposed-r2dbc.api
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ public class org/jetbrains/exposed/v1/r2dbc/statements/MergeSuspendExecutable :

public final class org/jetbrains/exposed/v1/r2dbc/statements/R2dbcConnectionImpl : org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection {
public fun <init> (Lorg/reactivestreams/Publisher;Ljava/lang/String;Lorg/jetbrains/exposed/v1/r2dbc/mappers/R2dbcTypeMapping;)V
public fun activeConnection (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun commit (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun executeInBatch (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -865,6 +866,7 @@ public final class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcDatabaseMe
}

public abstract interface class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection {
public fun activeConnection (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun commit (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun executeInBatch (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand All @@ -889,6 +891,10 @@ public abstract interface class org/jetbrains/exposed/v1/r2dbc/statements/api/R2
public abstract fun setTransactionIsolation (Lio/r2dbc/spi/IsolationLevel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection$DefaultImpls {
public static fun activeConnection (Lorg/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract class org/jetbrains/exposed/v1/r2dbc/statements/api/R2dbcExposedDatabaseMetadata : org/jetbrains/exposed/v1/core/statements/api/ExposedDatabaseMetadata {
public fun <init> (Ljava/lang/String;)V
public abstract fun columns ([Lorg/jetbrains/exposed/v1/core/Table;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ class R2dbcDatabase private constructor(
connectionUrl = options.urlString
connectionUrlMode = options.urlMode
TransactionManager.registerManager(this, manager(this))
// ABOVE should be replaced with BELOW when ThreadLocalTransactionManager is fully deprecated
// TransactionManager.registerManager(this, manager(this))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package org.jetbrains.exposed.v1.r2dbc.statements

import io.r2dbc.spi.Connection
import io.r2dbc.spi.IsolationLevel
import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import io.r2dbc.spi.Statement
import io.r2dbc.spi.TransactionDefinition
import io.r2dbc.spi.ValidationDepth
import io.r2dbc.spi.*
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitLast
import kotlinx.coroutines.reactive.awaitSingle
Expand Down Expand Up @@ -43,12 +39,30 @@ import java.util.*
*/
@Suppress("UnusedPrivateMember", "SpreadOperator")
class R2dbcConnectionImpl(
/** The publisher of underlying database [Connection] instances contained by this wrapper. */
override val connection: Publisher<out Connection>,
private val vendorDialect: String,
private val typeMapping: R2dbcTypeMapping
) : R2dbcExposedConnection<Publisher<out Connection>> {
private val metadataProvider: MetadataProvider = MetadataProvider.getProvider(vendorDialect)

/**
* Retrieves a publisher that provides only the single current underlying database [Connection] instance in use.
*
* If none is active, it awaits a value from the [connection] publisher and internally stores the result. Retrieval
* of a new value will invoke `Connection.beginTransaction()`, so this method should preferably be called when an active
* transaction is underway, in order to retrieve the accurate connection object instance.
*
* If this method is invoked intentionally to trigger the start of a transaction, then [setTransactionDefinition]
* should ideally be manually called first with the appropriate [TransactionDefinition], as well as any other
* [Connection] configuration methods.
*/
Comment thread
bog-walk marked this conversation as resolved.
override suspend fun activeConnection(): Publisher<out Connection> {
// retrieves localConnection if not null, otherwise awaits value from connection publisher
val acquiredConnection: Connection = withConnection { this }
return flowOf(acquiredConnection).asPublisher()
}

override suspend fun getCatalog(): String = withConnection {
getCurrentCatalog(metadataProvider)
?: executeSQL(metadataProvider.getUsername()) { row, _ ->
Expand Down Expand Up @@ -271,10 +285,6 @@ internal fun IsolationLevel.asInt(): Int = isolationLevelMapping.getOrElse(this)
error("Unsupported IsolationLevel as Int: ${this.asSql()}")
}

internal fun Int.asIsolationLevel(): IsolationLevel = isolationLevelMapping.entries
.firstOrNull { it.value == this }?.key
?: error("Unsupported Int as IsolationLevel: $this")

internal suspend fun Connection.executeSQL(sqlQuery: String) {
if (sqlQuery.isEmpty()) return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface R2dbcExposedConnection<OriginalConnection : Any> {
/** The underlying database connection object contained by this wrapper. */
val connection: OriginalConnection

/** Retrieves the current underlying database connection object in use. */
suspend fun activeConnection(): OriginalConnection = connection

Comment on lines +14 to +16

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't particularly want to implement this, but it solves the issue of the exact active connection being passed to Spring ConnectionHolder versus an awaited value from the publisher.

With JDBC, the transaction simply holds a single Connection instance, so it is straightforward to get when synchronizing Exposed & Spring transaction, so that more low-level transactional operations like JdbcTemplate can use the same connection our transactions use.

I wanted to achieve the same functionality for R2DBC using its native DatabaseClient. But Exposed would occasionally retrieve the incorrect connection instance (when the operation happens outside a callback). This essentially will open up access to the private var localConnection in the implementation.

/** Retrieves the name of the connection's catalog. */
suspend fun getCatalog(): String

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.jetbrains.exposed.v1.r2dbc.transactions

import io.r2dbc.spi.IsolationLevel
import kotlinx.coroutines.currentCoroutineContext
import org.jetbrains.exposed.v1.core.InternalApi
import org.jetbrains.exposed.v1.core.Transaction
import org.jetbrains.exposed.v1.core.transactions.ThreadLocalTransactionsStack
Expand Down Expand Up @@ -65,23 +64,23 @@ internal fun R2dbcTransactionManager.createTransactionContext(transaction: Trans
}

/**
* Returns the current R2DBC transaction from the coroutine context, or null if none exists.
* Returns the current R2DBC transaction from the stack, or null if none exists.
*
* This method performs type checking to ensure the transaction in the context is actually
* an [R2dbcTransaction]. If a non-R2DBC transaction is found in the context, an error is thrown
* This method performs type checking to ensure the transaction in the stack is actually
* an [R2dbcTransaction]. If a non-R2DBC transaction is found in the stack, an error is thrown
* to prevent type confusion between JDBC and R2DBC transactions.
*
* @return The current [R2dbcTransaction] from the coroutine context, or null if no transaction exists
* @throws [IllegalStateException] If the transaction in the context is not an [R2dbcTransaction]
* @return The current [R2dbcTransaction] from the stack, or null if no transaction exists
* @throws [IllegalStateException] If the transaction in the stack is not an [R2dbcTransaction]
*/
internal suspend fun R2dbcTransactionManager.getCurrentContextTransaction(): R2dbcTransaction? {
internal fun R2dbcTransactionManager.getCurrentStackTransaction(): R2dbcTransaction? {
@OptIn(InternalApi::class)
val transaction = currentCoroutineContext()[contextKey]?.transaction
val transaction = ThreadLocalTransactionsStack.getTransactionOrNull(db)
return when {
transaction == null -> null
transaction is R2dbcTransaction -> transaction
else -> error(
"Expected R2dbcTransaction in coroutine context but found ${transaction::class.simpleName}. " +
"Expected R2dbcTransaction in stack but found ${transaction::class.simpleName}. " +
"This may indicate mixing JDBC and R2DBC transactions incorrectly."
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ suspend fun <T> suspendTransaction(
statement: suspend R2dbcTransaction.() -> T
): T {
val databaseToUse = resolveR2dbcDatabaseOrThrow(db)
val outer = databaseToUse.transactionManager.getCurrentContextTransaction()
val outer = databaseToUse.transactionManager.getCurrentStackTransaction()

return if (outer != null) {
val transaction = outer.transactionManager.newTransaction(
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ kotlinx-coroutines = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutin
kotlinx-coroutines-debug = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-debug", version.ref = "kotlinCoroutines" }
kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "kotlinCoroutines" }
kotlinx-coroutines-reactive = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-reactive", version.ref = "kotlinCoroutines" }
kotlinx-coroutines-reactor = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-reactor", version.ref = "kotlinCoroutines" }
kotlinx-jvm-datetime = { group = "org.jetbrains.kotlinx", name = "kotlinx-datetime-jvm", version.ref = "kotlinx-datetime" }
kotlinx-serialization = { group = "org.jetbrains.kotlinx", name = "kotlinx-serialization-json", version.ref = "kotlinxSerialization" }

Expand All @@ -85,6 +86,7 @@ spring6-jdbc = { group = "org.springframework", name = "spring-jdbc", version.re
spring6-context = { group = "org.springframework", name = "spring-context", version.ref = "springFramework6" }
spring6-test = { group = "org.springframework", name = "spring-test", version.ref = "springFramework6" }
spring7-jdbc = { group = "org.springframework", name = "spring-jdbc", version.ref = "springFramework7" }
spring7-r2dbc = { group = "org.springframework", name = "spring-r2dbc", version.ref = "springFramework7" }
spring7-context = { group = "org.springframework", name = "spring-context", version.ref = "springFramework7" }
spring7-test = { group = "org.springframework", name = "spring-test", version.ref = "springFramework7" }

Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ include("exposed-jodatime")
include("exposed-java-time")
include("spring-transaction")
include("spring7-transaction")
include("spring7-reactive-transaction")
include("exposed-spring-boot-starter")
include("exposed-spring-boot4-starter")
include("exposed-jdbc")
Expand Down
12 changes: 12 additions & 0 deletions spring7-reactive-transaction/api/spring7-reactive-transaction.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
public final class org/jetbrains/exposed/v1/spring7/reactive/transaction/ExposedSpringReactiveTransactionAttributeSource : org/springframework/transaction/interceptor/TransactionAttributeSource {
public fun <init> ()V
public fun <init> (Lorg/springframework/transaction/interceptor/TransactionAttributeSource;Ljava/util/List;)V
public synthetic fun <init> (Lorg/springframework/transaction/interceptor/TransactionAttributeSource;Ljava/util/List;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun getTransactionAttribute (Ljava/lang/reflect/Method;Ljava/lang/Class;)Lorg/springframework/transaction/interceptor/TransactionAttribute;
}

public final class org/jetbrains/exposed/v1/spring7/reactive/transaction/SpringReactiveTransactionManager : org/springframework/transaction/reactive/AbstractReactiveTransactionManager {
public fun <init> (Lio/r2dbc/spi/ConnectionFactory;Lorg/jetbrains/exposed/v1/r2dbc/R2dbcDatabaseConfig$Builder;Z)V
public synthetic fun <init> (Lio/r2dbc/spi/ConnectionFactory;Lorg/jetbrains/exposed/v1/r2dbc/R2dbcDatabaseConfig$Builder;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

59 changes: 59 additions & 0 deletions spring7-reactive-transaction/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import org.gradle.api.tasks.testing.logging.*
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.tasks.*

plugins {
kotlin("jvm")

alias(libs.plugins.dokka)
}

repositories {
mavenCentral()
}

kotlin {
jvmToolchain(17)
}

dependencies {
api(project(":exposed-core"))
api(project(":exposed-r2dbc"))
api(libs.spring7.r2dbc)
api(libs.spring7.context)
implementation(libs.kotlinx.coroutines.reactor)

testImplementation(project(":exposed-r2dbc-tests"))
testImplementation(kotlin("test"))
testImplementation(libs.junit6)
testRuntimeOnly(libs.junit.platform.launcher)
testImplementation(libs.kotlinx.coroutines.debug)
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.spring7.test)
testImplementation(libs.slf4j)
testImplementation(libs.log4j.slf4j.impl)
testImplementation(libs.log4j.api)
testImplementation(libs.log4j.core)
testImplementation(libs.r2dbc.h2) {
exclude(group = "com.h2database", module = "h2")
}
}

tasks.withType<KotlinCompile>().configureEach {
compilerOptions {
jvmTarget.set(JvmTarget.JVM_17)
}
}

tasks.withType<Test>().configureEach {
if (JavaVersion.VERSION_1_8 > JavaVersion.current()) {
jvmArgs = listOf("-XX:MaxPermSize=256m")
}
testLogging {
events.addAll(listOf(TestLogEvent.PASSED, TestLogEvent.FAILED, TestLogEvent.SKIPPED))
showStandardStreams = true
exceptionFormat = TestExceptionFormat.FULL
}

useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.jetbrains.exposed.v1.spring7.reactive.transaction

import io.r2dbc.spi.R2dbcException
import org.springframework.transaction.annotation.AnnotationTransactionAttributeSource
import org.springframework.transaction.interceptor.RollbackRuleAttribute
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute
import org.springframework.transaction.interceptor.TransactionAttribute
import org.springframework.transaction.interceptor.TransactionAttributeSource
import java.lang.reflect.Method

/**
* A [TransactionAttributeSource] that adds `ExposedR2dbcException` to the rollback rules of the delegate.
*
* @property delegate The delegate [TransactionAttributeSource] to use. Defaults to [AnnotationTransactionAttributeSource].
* If you use a custom [TransactionAttributeSource], you can pass it here.
*/
class ExposedSpringReactiveTransactionAttributeSource(
private val delegate: TransactionAttributeSource = AnnotationTransactionAttributeSource(),
private val rollbackExceptions: List<Class<out Throwable>> = listOf(R2dbcException::class.java)
) : TransactionAttributeSource {

override fun getTransactionAttribute(method: Method, targetClass: Class<*>?): TransactionAttribute? {
val attr = delegate.getTransactionAttribute(method, targetClass)
if (attr is RuleBasedTransactionAttribute) {
val rules = attr.rollbackRules.toMutableList()
rollbackExceptions.forEach { exception ->
val containsException = rules.any {
it is RollbackRuleAttribute && it.exceptionName == exception.name
}
if (!containsException) {
rules.add(RollbackRuleAttribute(exception))
}
}
attr.rollbackRules = rules
}
return attr
}
}
Loading
Loading