Kafka KStream Связанные сообщения События в скользящем окне
У нас есть ситуация, я думаю, что Kafka Streams может помочь, но я не могу найти документацию / примеры, которые я уже видел, которые помогают.
Есть один похожий вопрос, который я нашел, но у него нет рекомендаций по реализации (где я в данный момент теряюсь): функция ожидания Kafka Streams с зависимыми объектами
Что я хочу сделать:
Я хотел бы сопоставить связанные записи из темы Кафки в один объект и создать этот новый объект. Например, может быть 5 записей сообщений, которые связаны друг с другом уникальным ключом - я хочу построить новый объект из этих связанных объектов и создать его в новой очереди.
Я хочу, чтобы все связанные события использовались в течение часа. Кафка описывает это как скользящее окно. Как только запись сообщения A с идентификатором "123" поступит к потребителю, приложение должно подождать как минимум один час, пока не поступят оставшиеся записи с идентификатором "123". После того, как все записи прибывают или через час у них истекает срок записи.
Наконец, все связанные сообщения, собранные за час, используются для создания нового объекта, а затем отправляются в другую очередь Kafka.
Проблемы, с которыми я столкнулся.
Скользящее окно в Кафке, кажется, работает только при объединении двух потоков. У нас будет только один поток, связанный с темой - я не знаю, зачем нужны два потока или как мы будем реализовывать это. Я не могу найти никаких примеров этого онлайн. Все функции потоков, которые я видел в Kafka, просто агрегируют / уменьшают до простого значения при сборе событий с одним и тем же ключом. Например, сколько раз появляется ключ или добавляется какое-то значение
Вот некоторый псевдокод для описания того, о чем я говорю. Имена / семантика функций будут разными, если функциональность существует.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Вопросы (и спасибо за помощь):
- Как я могу использовать скользящее окно с одним потоком?
- Как настроить функции KStream/KTable для сбора всех связанных событий в окне и создания нового объекта в другой очереди?
- Как управление отслеживанием / смещением работает с потоками скользящего окна?
- Может ли это гарантировать точно один раз доставку? Для справки: https://www.confluent.io/blog/enabling-exactly-kafka-streams/
1 ответ
Поддержка скользящего окна для агрегирования была добавлена в Apache Kafka 2.7.