Создание и доступ к таблице обновлений в Kstreams

Я хочу получить доступ к потоку данных "A", проверить некоторые условия в таблице "B" и, наконец, обновить состояние таблицы "B".

(A,B) ====> check conditions in B ====> finally update B

Как я могу реализовать ту же логику в потоках kafka? Из документации я заметил, что для потоковой передачи данных "A" мы можем использовать Kstream, а для обновления таблицы "B" - Ktable. Но только операции, разрешенные между kstreams и ktable, являются объединениями, и он не обновляет ktable(вместо этого использует его только как поиск). Также он создает еще один новый Kstream(который я не хочу). Короче говоря, я хочу получить доступ к Ktable как к таблице обновления со ссылкой на входящий поток, так как же мы можем реализовать это в потоках kafka?

1 ответ

Содержание KTable поддерживается API потоков, и вы не можете обновить KTable непосредственно.

Я предполагаю, что вы прочитали KTable из темы; для этого вы обновите таблицу, написав в тему, а Kafka Streams обновит таблицу, прочитав тему. KTable то, что читается из темы, предназначено для содержания этой темы.

Конечно, запись в тему будет обновлять KTable не мгновенно, но с некоторой задержкой (это асинхронное обновление). Если вам нужно обновление синхронизации, вы не можете использовать KTable но вы можете использовать .transform() с прикрепленным состоянием.

Другие вопросы по тегам