Как управлять Kafka KStream для оконного соединения Kstream?
На основе Apache Kafka документы KStream-to-KStream Joins are always windowed joins
, мой вопрос, как я могу контролировать размер окна? Это одинаковый размер для хранения данных по теме? Или, например, мы можем хранить данные в течение 1 месяца, но присоединяться к потоку только на прошлой неделе?
Есть ли хороший пример, чтобы показать оконное соединение KStream-to-kStream с окнами?
В моем случае, скажем, у меня есть 2 KStream, kstream1
а также kstream2
Я хочу иметь возможность присоединиться 10 дней kstream1
до 30 дней kstream2
,
2 ответа
Это абсолютно возможно. Когда вы определяете свой оператор Stream, вы явно указываете размер окна соединения.
KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
stream1.leftJoin(stream2,
... // add ValueJoiner
JoinWindows.of(joinWindowSizeMs)
);
// or if you want to use retention time
stream1.leftJoin(stream2,
... // add ValueJoiner
(JoinWindows)JoinWindows.of(joinWindowSizeMs)
.until(windowRetentionTimeMs)
);
См. http://docs.confluent.io/current/streams/developer-guide.html для получения дополнительной информации.
Скользящее окно в основном определяет дополнительный предикат соединения. В SQL-подобном синтаксисе это будет что-то вроде:
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts
AND
stream2.ts <= stream1.ts + after
где before == after == joinWindowSizeMs
в этом примере. before
а также after
может также иметь разные значения, если вы используете JoinWindows#before()
а также JoinWindows#after()
установить эти значения явно.
Время хранения исходных тем полностью не зависит от указанного windowRetentionTimeMs
это применяется к теме журнала изменений, созданной самой Kafka Streams. Сохранение в окне позволяет объединять записи не по порядку друг с другом, т. Е. Записи, поступающие с опозданием (имейте в виду, что Kafka имеет гарантию заказа на основе смещения, но в отношении временных отметок запись может быть не в порядке),
В дополнение к тому, что сказал Матиас Дж. Сакс, есть пример соединения "поток к потоку" (оконный) по адресу: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
Это для Confluent 3.1.x с Apache Kafka 0.10.1, то есть для последних версий от января 2017 года. master
перейдите в репозиторий выше для примеров кода, которые используют более новые версии.
Вот ключевая часть приведенного выше примера кода (опять же для Kafka 0.10.1), немного адаптированная к вашему вопросу. Обратите внимание, что этот пример демонстрирует OUTER JOIN.
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");