Запись в тему из процессора в приложении Spring Cloud Streams Kafka Stream
Я использую Processor API для обработки данных на низком уровне в хранилище состояний. Дело в том, что мне тоже нужно писать в тему после сохранения в магазин. Как это можно сделать в приложениях Spring Cloud Streams Kafka?
@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->
event.map{
...
}.process(ProcessorSupplier {
object : Processor<EventId, MappedEventValue> {
private lateinit var store: KeyValueStore<EventId, MappedEventValue>
override fun init(context: ProcessorContext) {
store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
}
override fun process(key: EventId, value: MappedEventValue) {
...
store.put(key, processedMappedEventValue)
//TODO Write into a topic
}
}
}
}
1 ответ
Решение
Вы не можете. Вprocess()
Метод - это терминальная операция, которая не позволяет передавать данные в нисходящий поток. Вместо этого вы можете использоватьtransform()
хотя (это в основном то же самое process()
но позволяет передавать данные вниз по потоку); или в зависимости от вашего приложения,transformValues()
или flatTransform()
и т.п.
С помощью transform()
Вы получаете KStream
назад, что можно написать в теме.