Kafka KStream Связанные сообщения События в скользящем окне

У нас есть ситуация, я думаю, что Kafka Streams может помочь, но я не могу найти документацию / примеры, которые я уже видел, которые помогают.

Есть один похожий вопрос, который я нашел, но у него нет рекомендаций по реализации (где я в данный момент теряюсь): функция ожидания Kafka Streams с зависимыми объектами

Что я хочу сделать:

Я хотел бы сопоставить связанные записи из темы Кафки в один объект и создать этот новый объект. Например, может быть 5 записей сообщений, которые связаны друг с другом уникальным ключом - я хочу построить новый объект из этих связанных объектов и создать его в новой очереди.

Я хочу, чтобы все связанные события использовались в течение часа. Кафка описывает это как скользящее окно. Как только запись сообщения A с идентификатором "123" поступит к потребителю, приложение должно подождать как минимум один час, пока не поступят оставшиеся записи с идентификатором "123". После того, как все записи прибывают или через час у них истекает срок записи.

Наконец, все связанные сообщения, собранные за час, используются для создания нового объекта, а затем отправляются в другую очередь Kafka.

Проблемы, с которыми я столкнулся.

Скользящее окно в Кафке, кажется, работает только при объединении двух потоков. У нас будет только один поток, связанный с темой - я не знаю, зачем нужны два потока или как мы будем реализовывать это. Я не могу найти никаких примеров этого онлайн. Все функции потоков, которые я видел в Kafka, просто агрегируют / уменьшают до простого значения при сборе событий с одним и тем же ключом. Например, сколько раз появляется ключ или добавляется какое-то значение

Вот некоторый псевдокод для описания того, о чем я говорю. Имена / семантика функций будут разными, если функциональность существует.

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )

Вопросы (и спасибо за помощь):

  1. Как я могу использовать скользящее окно с одним потоком?
  2. Как настроить функции KStream/KTable для сбора всех связанных событий в окне и создания нового объекта в другой очереди?
  3. Как управление отслеживанием / смещением работает с потоками скользящего окна?
  4. Может ли это гарантировать точно один раз доставку? Для справки: https://www.confluent.io/blog/enabling-exactly-kafka-streams/

1 ответ

Поддержка скользящего окна для агрегирования была добавлена ​​в Apache Kafka 2.7.

См. Https://issues.apache.org/jira/browse/KAFKA-5636

Другие вопросы по тегам