Почему моя топология Kafka Streams неправильно воспроизводит / воспроизводит?

У меня есть топология, которая выглядит так:

KTable<ByteString, User> users = topology.table(USERS);

KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS)
    .mapValues(entityTopologyProcessor::userNew)
    .to(USERS);

topology.stream(SETTINGS_CONFIRM_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsConfirm)
    .to(USERS);

topology.stream(SETTINGS_UPDATE_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsUpdate)
    .to(USERS);

Во время выполнения эта топология работает нормально. Пользователи создаются с помощью запросов на присоединение. Они подтверждают свои настройки с помощью настроек подтверждения запросов. Они обновляют свои настройки с помощью запросов на обновление настроек.

Однако повторная обработка этой топологии не дает исходных результатов. В частности, средство объединения обновлений настроек не видит пользователя, полученного в результате присоединения подтверждения настроек, хотя с точки зрения временных отметок проходит много секунд с момента создания пользователя до момента подтверждения пользователя до момента обновления пользователем их настройки.

Я в недоумении. Я попытался отключить кеширование / запись в журнал для пользовательской таблицы. Понятия не имею, что нужно сделать, чтобы правильно выполнить эту обработку.

2 ответа

Соединение KStream-KTable не является на 100% детерминированным (и может никогда не стать на 100% детерминированным). Мы знаем о проблеме и обсуждаем решения, чтобы хотя бы смягчить проблему.

Одна из проблем заключается в том, что, если потребитель получает данные от брокеров, мы не можем легко контролировать, по каким темам и / или разделам брокер возвращает данные. И в зависимости от порядка, в котором мы получаем данные от брокера, результат может немного отличаться.

Одна связанная проблема: https://issues.apache.org/jira/browse/KAFKA-3514

Этот пост в блоге тоже может помочь: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Я смог частично решить мою проблему, заменив код в вопросе на:

KTable<ByteString, User> users = topology.table(JOIN_REQUESTS)
    .mapValue(entityTopologyProcessor::user)
    .leftJoin(topology
                 .stream(CONFIRM_SETTINGS_REQUESTS)
                 .groupByKey()
                 .reduce((a, b) -> b),
              entityTopologyProcessor::confirmSettings)
    .leftJoin(topology
                 .stream(SETTINGS_UPDATE_REQUESTS)
                 .groupByKey()
                 .reduce(entityTopologyProcessor::settingsUpdateReduce),
              entityTopologyProcessor::settingsUpdate);

Это решение использует тот факт, что все объединения таблиц и таблиц являются детерминированными. Во время повторной обработки результирующее состояние может временно быть неправильным, но как только топология будет обнаружена, окончательное значение будет правильным (конечная временная метка для данного результата все еще не будет детерминированной). Вообще говоря, этот подход группирует все события (в этом примере: запросы на соединение, запросы на подтверждение настроек, запросы на обновление настроек) для данного объекта (в данном примере: пользователя) в одну задачу и объединяет их накопления в один продукт., Этот пример можно дополнить событиями удаления, присоединив в конце другой поток, который обнуляет результат.

Помимо этого подхода, как правило, написание топологии с возможностью повторной обработки требует обдумывания топологии в двух измерениях: в реальном времени и во время повторной обработки. Начиная с Kafka Streams 1.0.0, это что-то вроде искусства для разработчика.

Другие вопросы по тегам