Транзакционность Spring Cloud Stream Kafka Producer в реактивном приложении Spring Webflux
Я работаю с Spring Cloud Stream Binder Kafka 3.0.0 в приложениях Spring Webflux, которые предоставляют API, который получает некоторые данные и публикует их в теме Kafka, используя @Output
:
@Autowired
private lateinit var producer: Producer
@PostMapping
@ResponseStatus(CREATED)
fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
producer.send(metrics)
return Mono.empty()
}
@Component
class Producer(private val producerSources: ProducerSources) {
@Transactional
fun send(metrics: SomeMetric) {
producerSources.metrics().send(MessageBuilder.withPayload(metrics) .build())
}
}
interface ProducerSources {
@Output("metrics")
fun metrics(): MessageChannel
//Other...
}
Я настроил и Kafka, и приложение Spring Boot для использования транзакционных производителей (обратите внимание на @Transactional
аннотацию к методу отправки выше):
@Configuration
class KafkaProducerConfiguration {
@Bean
fun transactionManager(binders: BinderFactory): PlatformTransactionManager {
val pf = (binders.getBinder("kafka",
MessageChannel::class.java) as KafkaMessageChannelBinder).transactionalProducerFactory
return KafkaTransactionManager<ByteArray, ByteArray>(pf)
}
@EnableTransactionManagement
@EnableBinding(value = [ProducerSources::class])
@SpringBootApplication
class MyApplication
fun main(args: Array<String>) {
runApplication<MyApplication>(*args)
}
Дело в том, что, будучи приложением Spring Webflux, я не должен блокировать поток http, поэтому я должен обернуть (блокирующего) производителя в блок fromCallable и выполнить его в другом пуле потоков, например:
@Component
class Producer(private val producerSources: ProducerSources) {
@Transactional
fun send(metrics: SomeMetric) : Mono<Unit> {
Mono.fromCallable {
producerSources.metrics().send(MessageBuilder.withPayload(metrics) .build())
}.subscribeOn(Schedulers.elastic())
}
}
@PostMapping
@ResponseStatus(CREATED)
fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
return producer.send(metrics)
}
Мои вопросы:
- Есть ли
@Transactional
аннотации все еще работают с этим подходом? Думаю, не должно... - Какой рекомендуемый способ поддержки транзакций в реактивном контексте Spring Webflux + Cloud Stream Kafka?
- БОНУС: Поддерживается ли Reactor Kafka в Spring Cloud Stream? Если да, то как мы можем настроить его в этом случае?
@Output
, транзакционная поддержка...?