Настройка топологии потоков Kafka с помощью функции Spring bean
Я пытаюсь запустить среду тестирования топологии Kafka.
Производственный код действительно небольшой
@SpringBootApplication
public class ProcessApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessApplication.class, args);
}
@Bean
public Function<Input, Output> process() {
return EventTransformer::transform;
}
}
Теперь я хотел протестировать код в рамках интеграционного теста. Если я правильно понимаю концепцию, я должен включить new ProcessApplication (). Process () в StreamsBuilder, но я не знаю, как добавить в него java.util.Function.
private Topology createTopology(SpecificAvroSerde specificAvroSerde) {
final Serde<String> stringSerde = new Serdes.StringSerde();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Input> input = builder.stream(INPUT_TOPIC,
Consumed.with(stringSerde, specificAvroSerde));
new ProcessApplication().process();
// Compiles and runs but data are missing, assume to add new ProcessApplication().process() somehow
KStream<String, Output> output =
input.mapValues(value -> new Output());
output.to(OUTPUT_TOPIC, Produced.with(stringSerde, specificAvroSerde));
return builder.build();
}
Далее я нашел другое решение, как вместе построить топологию.
private Topology createTopology()
topology.addSource("mySource", INPUT_TOPIC)
new ProcessApplication().process()
topology.addProcessor("myProcessor", new ProcessorSupplier, "mySource")
topology.addSink(
"mySink",
OUTPUT_TOPIC,
Serdes.String().serializer(),
new SpecificAvroSerde().serializer(),
"myProcessor"
);
return topology
}
Даже здесь я не понимаю, что мне делать. Что еще хуже, у меня есть некоторые сомнения, что процессор, который я написал, действительно является процессом или я должен использовать ProcessorSupplier.
В среде Docker производственный код работает должным образом.
Спасибо, Маркус