Настройка топологии потоков Kafka с помощью функции Spring bean

Я пытаюсь запустить среду тестирования топологии Kafka.

Производственный код действительно небольшой

      @SpringBootApplication
public class ProcessApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProcessApplication.class, args);
    }

    @Bean
    public Function<Input, Output> process() {
        return EventTransformer::transform;
    }
}

Теперь я хотел протестировать код в рамках интеграционного теста. Если я правильно понимаю концепцию, я должен включить new ProcessApplication (). Process () в StreamsBuilder, но я не знаю, как добавить в него java.util.Function.

      private Topology createTopology(SpecificAvroSerde specificAvroSerde) { 
    final Serde<String> stringSerde = new Serdes.StringSerde();
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Input> input = builder.stream(INPUT_TOPIC,
            Consumed.with(stringSerde, specificAvroSerde));

    new ProcessApplication().process();

    // Compiles and runs but data are missing, assume to add new ProcessApplication().process() somehow
    KStream<String, Output> output =
            input.mapValues(value -> new Output());
    output.to(OUTPUT_TOPIC, Produced.with(stringSerde, specificAvroSerde));
    
    return builder.build();
}

Далее я нашел другое решение, как вместе построить топологию.

      private Topology createTopology()
    topology.addSource("mySource", INPUT_TOPIC)
    new ProcessApplication().process()
    topology.addProcessor("myProcessor", new ProcessorSupplier, "mySource")
    topology.addSink(
            "mySink",
            OUTPUT_TOPIC,
            Serdes.String().serializer(),
            new SpecificAvroSerde().serializer(),
            "myProcessor"
    );
    
    return topology
}

Даже здесь я не понимаю, что мне делать. Что еще хуже, у меня есть некоторые сомнения, что процессор, который я написал, действительно является процессом или я должен использовать ProcessorSupplier.

В среде Docker производственный код работает должным образом.

Спасибо, Маркус

0 ответов

Другие вопросы по тегам