Транзакционность Spring Cloud Stream Kafka Producer в реактивном приложении Spring Webflux

Я работаю с Spring Cloud Stream Binder Kafka 3.0.0 в приложениях Spring Webflux, которые предоставляют API, который получает некоторые данные и публикует их в теме Kafka, используя @Output:

    @Autowired
    private lateinit var producer: Producer

    @PostMapping
    @ResponseStatus(CREATED)
    fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
        producer.send(metrics)
        return Mono.empty()
    }
@Component
class Producer(private val producerSources: ProducerSources) {

    @Transactional
    fun send(metrics: SomeMetric) {
        producerSources.metrics().send(MessageBuilder.withPayload(metrics)                       .build())
    }            
}
interface ProducerSources {

    @Output("metrics")
    fun metrics(): MessageChannel

    //Other...

}

Я настроил и Kafka, и приложение Spring Boot для использования транзакционных производителей (обратите внимание на @Transactional аннотацию к методу отправки выше):

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun transactionManager(binders: BinderFactory): PlatformTransactionManager {
        val pf = (binders.getBinder("kafka",
                MessageChannel::class.java) as KafkaMessageChannelBinder).transactionalProducerFactory
        return KafkaTransactionManager<ByteArray, ByteArray>(pf)
    }
@EnableTransactionManagement
@EnableBinding(value = [ProducerSources::class])
@SpringBootApplication
class MyApplication

fun main(args: Array<String>) {
    runApplication<MyApplication>(*args)
}

Дело в том, что, будучи приложением Spring Webflux, я не должен блокировать поток http, поэтому я должен обернуть (блокирующего) производителя в блок fromCallable и выполнить его в другом пуле потоков, например:

@Component
class Producer(private val producerSources: ProducerSources) {

    @Transactional
    fun send(metrics: SomeMetric) : Mono<Unit> {
        Mono.fromCallable {
            producerSources.metrics().send(MessageBuilder.withPayload(metrics)                       .build())           
       }.subscribeOn(Schedulers.elastic())
    }

}
    @PostMapping
    @ResponseStatus(CREATED)
    fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
        return producer.send(metrics)
    }

Мои вопросы:

  • Есть ли @Transactionalаннотации все еще работают с этим подходом? Думаю, не должно...
  • Какой рекомендуемый способ поддержки транзакций в реактивном контексте Spring Webflux + Cloud Stream Kafka?
  • БОНУС: Поддерживается ли Reactor Kafka в Spring Cloud Stream? Если да, то как мы можем настроить его в этом случае?@Output, транзакционная поддержка...?

0 ответов