Разница между KTable и местным магазином

Какая разница между этими сущностями?

Как мне кажется, KTable - простая тема с кафкой compaction политика удаления. Кроме того, если ведение журнала включено для KTable, то есть также журнал изменений, а затем, политика удаления compaction,delete,

Локальное хранилище - Кэш-память ключей в памяти на основе RockDB. Но в местном магазине также есть список изменений.

В обоих случаях мы получаем последнее значение для ключа за определенный период времени (?). Локальное хранилище используется для этапов агрегирования, объединений и т. Д. Но после него также была создана новая тема со стратегией уплотнения.

Например:

KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?

// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store

source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?

Также я могу создать государственный магазин, используя строитель builder.addStateStore(...) где я могу включить / отключить ведение журнала (changelog) и кэширование (???).

Я прочитал это: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html, но некоторые детали все еще неясны для меня. Особенно тот случай, когда мы можем отключить StreamCache (но не RockDB-кеш) и мы получим полную копию системы CDC для реляционной базы данных.

1 ответ

Решение

KTable является логической абстракцией таблицы, которая обновляется с течением времени. Кроме того, вы можете рассматривать ее не как материализованную таблицу, а как поток журнала изменений, который состоит из всех записей обновления таблицы. Сравните https://docs.confluent.io/current/streams/concepts.html. Следовательно, концептуально KTable это что-то гибридное, если хотите, однако проще думать об этом, как о таблице, которая со временем обновляется.

Внутренне KTable реализован с использованием RocksDB и топика в Kafka. RocksDB, хранит текущие данные таблицы (обратите внимание, что RocksDB не является хранилищем в памяти и может записывать на диск). В то же время каждое обновление KTable (то есть в RocksDB) записывается в соответствующую тему Кафки. Раздел Kafka используется по причинам отказоустойчивости (обратите внимание, что сама RocksDB считается эфемерной, а запись на диск через RocksDB не обеспечивает отказоустойчивость, а используется раздел изменений), и она конфигурируется с включенным сжатием журнала, чтобы убедиться, что последнее состояние RocksDB можно восстановить, прочитав тему.

Если у тебя есть KTable то, что создается оконной агрегацией, тема Kafka настроена с compact,delete к устаревшим старым данным (т. е. старым окнам), чтобы избежать того, что таблица (т. е. RocksDB) станет неограниченной.

Вместо RocksDB вы также можете использовать хранилище в памяти для KTable что не пишет на диск. В этом хранилище также есть тема журнала изменений, которая отслеживает все обновления хранилища по причинам отказоустойчивости.

Если вы добавляете магазин вручную через builder.addStateStore() Вы также можете добавить RocksDB или хранилища в памяти. В этом случае вы можете включить журнал изменений для отказоустойчивости, аналогичной KTable (обратите внимание, что когда создается KTable, внутри него используется тот же API - т. е. KTable это абстракции более высокого уровня, скрывающие некоторые внутренние детали).

Для кэширования: это реализовано в потоках Kafka и поверх хранилища (либо RocksDB, либо в памяти), и вы можете включить / отключить это для "простых" хранилищ, которые вы добавляете вручную, или для KTables. Сравните https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html Таким образом, кэширование не зависит от кэширования RocksDB.

Другие вопросы по тегам