Создание и доступ к таблице обновлений в Kstreams
Я хочу получить доступ к потоку данных "A", проверить некоторые условия в таблице "B" и, наконец, обновить состояние таблицы "B".
(A,B) ====> check conditions in B ====> finally update B
Как я могу реализовать ту же логику в потоках kafka? Из документации я заметил, что для потоковой передачи данных "A" мы можем использовать Kstream, а для обновления таблицы "B" - Ktable. Но только операции, разрешенные между kstreams и ktable, являются объединениями, и он не обновляет ktable(вместо этого использует его только как поиск). Также он создает еще один новый Kstream(который я не хочу). Короче говоря, я хочу получить доступ к Ktable как к таблице обновления со ссылкой на входящий поток, так как же мы можем реализовать это в потоках kafka?
1 ответ
Содержание KTable
поддерживается API потоков, и вы не можете обновить KTable
непосредственно.
Я предполагаю, что вы прочитали KTable
из темы; для этого вы обновите таблицу, написав в тему, а Kafka Streams обновит таблицу, прочитав тему. KTable
то, что читается из темы, предназначено для содержания этой темы.
Конечно, запись в тему будет обновлять KTable
не мгновенно, но с некоторой задержкой (это асинхронное обновление). Если вам нужно обновление синхронизации, вы не можете использовать KTable
но вы можете использовать .transform()
с прикрепленным состоянием.