Как управлять Kafka KStream для оконного соединения Kstream?

На основе Apache Kafka документы KStream-to-KStream Joins are always windowed joins, мой вопрос, как я могу контролировать размер окна? Это одинаковый размер для хранения данных по теме? Или, например, мы можем хранить данные в течение 1 месяца, но присоединяться к потоку только на прошлой неделе?

Есть ли хороший пример, чтобы показать оконное соединение KStream-to-kStream с окнами?

В моем случае, скажем, у меня есть 2 KStream, kstream1 а также kstream2 Я хочу иметь возможность присоединиться 10 дней kstream1 до 30 дней kstream2,

2 ответа

Решение

Это абсолютно возможно. Когда вы определяете свой оператор Stream, вы явно указываете размер окна соединения.

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

См. http://docs.confluent.io/current/streams/developer-guide.html для получения дополнительной информации.

Скользящее окно в основном определяет дополнительный предикат соединения. В SQL-подобном синтаксисе это будет что-то вроде:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after

где before == after == joinWindowSizeMs в этом примере. before а также after может также иметь разные значения, если вы используете JoinWindows#before() а также JoinWindows#after() установить эти значения явно.

Время хранения исходных тем полностью не зависит от указанного windowRetentionTimeMs это применяется к теме журнала изменений, созданной самой Kafka Streams. Сохранение в окне позволяет объединять записи не по порядку друг с другом, т. Е. Записи, поступающие с опозданием (имейте в виду, что Kafka имеет гарантию заказа на основе смещения, но в отношении временных отметок запись может быть не в порядке),

В дополнение к тому, что сказал Матиас Дж. Сакс, есть пример соединения "поток к потоку" (оконный) по адресу: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

Это для Confluent 3.1.x с Apache Kafka 0.10.1, то есть для последних версий от января 2017 года. master перейдите в репозиторий выше для примеров кода, которые используют более новые версии.

Вот ключевая часть приведенного выше примера кода (опять же для Kafka 0.10.1), немного адаптированная к вашему вопросу. Обратите внимание, что этот пример демонстрирует OUTER JOIN.

long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);

final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");

KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
    (impressionValue, clickValue) -> impressionValue + "/" + clickValue,
    // KStream-KStream joins are always windowed joins, hence we must provide a join window.
    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
    stringSerde, stringSerde, stringSerde);

// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
Другие вопросы по тегам