Присоединение потоков Kafka: как подождать некоторое время перед отправкой записей?
В настоящее время у нас есть 2 темы потока Kafka, записи которых поступают постоянно. Мы изучаем возможность объединения двух потоков на основе ключа после ожидания окна продолжительностью 5 минут, но с моим текущим кодом я вижу, что записи отправляются немедленно, без «ожидания», чтобы увидеть, поступит ли соответствующая запись в другой поток. Моя текущая реализация:
KStream<String, String> streamA =
builder.stream(topicA, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Stream A incoming record key " + key + " value " + value));
KStream<String, String> streamB =
builder.stream(topicB, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Stream B incoming record key " + key + " value " + value));
ValueJoiner<String, String, String > recordJoiner =
(recordA, recordB) -> {
if(recordA != null) {
return recordA;
} else {
return recordB;
}
};
KStream<String, String > combinedStream =
streamA(
streamB,
recordJoiner,
JoinWindows
.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
Serdes.String(),
Serdes.String()))
.peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));
combinedStream.to("test-topic"
Produced.with(
Serdes.String(),
Serdes.String()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
Хотя у меня есть
JoinWindows.of(Duration.ofMinutes(5))
, Я вижу, что некоторые записи отправляются немедленно. Как мне убедиться, что это не так?
Кроме того, является ли это наиболее эффективным способом объединения двух потоков Kafka или лучше придумать нашу собственную потребительскую реализацию, которая считывает данные из двух потоков и т. Д.?