Разница между KTable и местным магазином
Какая разница между этими сущностями?
Как мне кажется, KTable - простая тема с кафкой compaction
политика удаления. Кроме того, если ведение журнала включено для KTable, то есть также журнал изменений, а затем, политика удаления compaction,delete
,
Локальное хранилище - Кэш-память ключей в памяти на основе RockDB. Но в местном магазине также есть список изменений.
В обоих случаях мы получаем последнее значение для ключа за определенный период времени (?). Локальное хранилище используется для этапов агрегирования, объединений и т. Д. Но после него также была создана новая тема со стратегией уплотнения.
Например:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store
source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?
Также я могу создать государственный магазин, используя строитель builder.addStateStore(...)
где я могу включить / отключить ведение журнала (changelog) и кэширование (???).
Я прочитал это: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html, но некоторые детали все еще неясны для меня. Особенно тот случай, когда мы можем отключить StreamCache (но не RockDB-кеш) и мы получим полную копию системы CDC для реляционной базы данных.
1 ответ
KTable
является логической абстракцией таблицы, которая обновляется с течением времени. Кроме того, вы можете рассматривать ее не как материализованную таблицу, а как поток журнала изменений, который состоит из всех записей обновления таблицы. Сравните https://docs.confluent.io/current/streams/concepts.html. Следовательно, концептуально KTable
это что-то гибридное, если хотите, однако проще думать об этом, как о таблице, которая со временем обновляется.
Внутренне KTable
реализован с использованием RocksDB и топика в Kafka. RocksDB, хранит текущие данные таблицы (обратите внимание, что RocksDB не является хранилищем в памяти и может записывать на диск). В то же время каждое обновление KTable
(то есть в RocksDB) записывается в соответствующую тему Кафки. Раздел Kafka используется по причинам отказоустойчивости (обратите внимание, что сама RocksDB считается эфемерной, а запись на диск через RocksDB не обеспечивает отказоустойчивость, а используется раздел изменений), и она конфигурируется с включенным сжатием журнала, чтобы убедиться, что последнее состояние RocksDB можно восстановить, прочитав тему.
Если у тебя есть KTable
то, что создается оконной агрегацией, тема Kafka настроена с compact,delete
к устаревшим старым данным (т. е. старым окнам), чтобы избежать того, что таблица (т. е. RocksDB) станет неограниченной.
Вместо RocksDB вы также можете использовать хранилище в памяти для KTable
что не пишет на диск. В этом хранилище также есть тема журнала изменений, которая отслеживает все обновления хранилища по причинам отказоустойчивости.
Если вы добавляете магазин вручную через builder.addStateStore()
Вы также можете добавить RocksDB или хранилища в памяти. В этом случае вы можете включить журнал изменений для отказоустойчивости, аналогичной KTable
(обратите внимание, что когда создается KTable, внутри него используется тот же API - т. е. KTable
это абстракции более высокого уровня, скрывающие некоторые внутренние детали).
Для кэширования: это реализовано в потоках Kafka и поверх хранилища (либо RocksDB, либо в памяти), и вы можете включить / отключить это для "простых" хранилищ, которые вы добавляете вручную, или для KTables. Сравните https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html Таким образом, кэширование не зависит от кэширования RocksDB.