Функция ожидания потоков Кафки с зависимыми объектами

Я создаю приложение Kafka Streams, которое получает разные объекты JSON из разных тем, и я хочу реализовать какую-то функцию ожидания, но я не уверен, как лучше ее реализовать.

Чтобы упростить проблему, я буду использовать упрощенные объекты в следующем разделе, я надеюсь, что проблему можно описать очень хорошо с ней. Таким образом, в одном из моих потоков я получаю автомобильные объекты, и у каждой машины есть идентификатор. Во втором потоке я получаю личные объекты, и у каждого человека также есть идентификатор автомобиля, и ему присваивается автомобиль с этим идентификатором.

Я хочу читать с помощью приложения Kafka Streams оба входных потока (темы) и обогатить объект автомобиля четырьмя людьми, которые имеют одинаковый идентификатор автомобиля. Автомобильные объекты должны быть переданы следующему процессору ниже по потоку, когда все четыре человека включены в автомобильный объект.

Я планировал создать входной поток для автомобиля и один для объектов person, проанализировать данные JSON во внутреннем представлении объекта, объединить оба потока и применить функцию "selectKey" к объединенному потоку, чтобы извлечь ключи из юридические лица. После этого я поместил бы данные в пользовательскую функцию преобразования, в которую было включено хранилище состояний. Внутри этой функции преобразования я буду хранить каждый прибывающий автомобильный объект с его идентификатором в хранилище состояний. Как только появляются новые объекты, я добавляю их к соответствующему объекту в государственном магазине (не обращайте внимания на случай позднего прибытия автомобилей). Как только четыре человека находятся в автомобильном объекте, я пересылаю объект в следующую функцию потока и удаляю автомобильный объект из хранилища состояний.

Будет ли это подходящим подходом для этого? Я не уверен в масштабируемости, потому что я должен убедиться, что при запуске нескольких экземпляров объекты car и person с одинаковым идентификатором будут обрабатываться одним и тем же экземпляром приложения. Я бы использовал функцию selectKey для этого, это будет работать?

Спасибо!

1 ответ

Решение

Базовый дизайн для меня звучит хорошо.

Тем не мение, selectKey() само по себе не будет достаточно, потому что transform() (в отличие от операторов DSL) не вызывает автобалансировки. Таким образом, вам нужно вручную перебалансировать через through(),

stream.selectKey(...)
      .through("user-created-topic")
      .transform(...);

https://docs.confluent.io/current/streams/upgrade-guide.html

Другие вопросы по тегам