Транзакции с ReactiveCrudRepository с spring-data-r2dbc
Я пытаюсь реализовать транзакции с репозиториями spring-data-r2dbc в сочетании с TransactionalDatabaseClient как таковой:
class SongService(
private val songRepo: SongRepo,
private val databaseClient: DatabaseClient
){
private val tdbc = databaseClient as TransactionalDatabaseClient
...
...
fun save(song: Song){
return tdbc.inTransaction{
songRepo
.save(mapRow(song, albumId)) //Mapping to a row representation
.delayUntil { savedSong -> tdbc.execute.sql(...).fetch.rowsUpdated() } //saving a many to many relation
.map(::mapSong) //Mapping back to actual song and retrieve the relationship data.
}
}
}
В настоящее время у меня есть класс конфигурации (с пометкой @Configuration
а также @EnableR2dbcRepositories
) который простирается от AbstractR2dbcConfiguration
, Здесь я перекрываю databaseClient
способ вернуть TransactionalDatabaseClient
, Это должен быть тот же экземпляр, что и в классе SongService.
При запуске кода в тесте с подпиской и печатью, я получаю org.springframework.transaction.NoTransactionException: ReactiveTransactionSynchronization not active
и данные об отношениях не возвращаются.
При использовании проекта Reactors stepverifier, хотя, я получаю java.lang.IllegalStateException: Connection is closed
, Также в этом случае данные отношения не возвращаются.
Просто для записи, я видел https://github.com/spring-projects/spring-data-r2dbc/issues/44
0 ответов
Here is a working Java example:
@Autowired TransactionalDatabaseClient txClient;
@Autowired Mono<Connection> connection;
//You Can also use: @Autowired Mono<? extends Publisher> connectionPublisher;
public Flux<Void> example {
txClient.enableTransactionSynchronization(connection);
// Or, txClient.enableTransactionSynchronization(connectionPublisher);
Flux<AuditConfigByClub> audits = txClient.inTransaction(tx -> {
txClient.beginTransaction();
return tx.execute().sql("SELECT * FROM audit.items")
.as(Item.class)
.fetch()
.all();
}).doOnTerminate(() -> {
txClient.commitTransaction();
});
txClient.commitTransaction();
audits.subscribe(item -> System.out.println("anItem: " + item));
return Flux.empty()
}
I just started reactive so not too sure what I'm doing with my callbacks haha. But I decided to go with TransactionalDatabaseClient
over DatabaseClient
or Connection
since I'll take all the utility I can get while R2dbc is in its current state.
In your code did you actually instantiate a Connection object? If so I think you would have done it in your configuration. It can be utilized throughout the app the same as DatabaseClient, but it is slightly more intricate.
If not:
@Bean
@Override // I also used abstract config
public ConnectionFactory connectionFactory() {
...
}
@Bean
TransactionalDatabaseClient txClient() {
...
}
//TransactionalDatabaseClient will take either of these as arg in
//#enableTransactionSynchronization method
@Bean
public Publisher<? extends Connection> connectionPublisher() {
return connectionFactory().create();
}
@Bean
public Mono<Connection> connection() {
return = Mono.from(connectionFactory().create());
}
If you are having problems translating to Kotlin, there is an alternative way to enable synchronization that could work:
// From what I understand, this is a useful way to move between
// transactions within a single subscription
TransactionResources resources = TransactionResources.create();
resources.registerResource(Resource.class, resource);
ConnectionFactoryUtils
.currentReactiveTransactionSynchronization()
.subscribe(currentTx -> sync.registerTransaction(Tx));
Hope this translates well for Kotlin.