Как сравнить (1 миллиард записей) данных между двумя потоками кафки или таблицами базы данных
Мы отправляем данные из DB2 (таблица-1) через CDC в разделы Kafka (раздел-1). нам нужно согласовать данные DB2 и темы Kafka. у нас есть два варианта -
а) перенести все данные темы кафки в DB2 (как таблица-1-копия), а затем выполнить внешнее левое соединение (между таблицей-1 и таблицей-1-копия), чтобы увидеть несоответствующие записи, создать дельту и нажать ее обратно в кафку.проблема: масштабируемость - наш набор данных насчитывает около миллиарда записей, и я не уверен, что DB2 DBA позволит нам выполнить такую огромную операцию соединения (которая может легко длиться более 15-20 минут).
б) снова вставьте DB2 обратно в параллельную тему kafka (topic-1-copy), а затем выполните какое-либо решение на основе потоков kafka, чтобы выполнить левое внешнее соединение между kafka topic-1 и topic-1-copy. Я все еще оборачиваюсь головой вокруг ручьев Кафки и оставляю внешние соединения. Я не уверен, смогу ли я (используя систему управления окнами в потоках kafka) сравнить ВСЕ содержание темы-1 с темой-1-копией.
Что еще хуже, тема-1 в kafka - это компактная тема, поэтому, когда мы отправляем данные из DB2 обратно в Kafka-тему-1-копию, мы не можем детерминистически запустить цикл уплотнения темы-kafka, чтобы убедиться, что обе темы- 1 и topic-1-copy полностью сжимаются перед выполнением любой операции сравнения на них.
в) есть ли какой-либо другой вариант структуры, который мы можем рассмотреть для этого?
Идеальное решение должно масштабироваться для данных любого размера.
0 ответов
Я не вижу причин, по которым вы не могли бы сделать это ни в Kafka Streams, ни в KSQL. Оба поддерживают соединения таблицы и таблицы. Это при условии, что формат данных поддерживается.
Сжатие ключей не повлияет на результаты, поскольку и Streams, и KSQL создадут правильное окончательное состояние соединения двух таблиц. Если уплотнение выполнено, объем данных, требующих обработки, может быть меньше, но результат будет таким же.
Например, в ksqlDB вы можете импортировать обе темы в виде таблиц и выполнить соединение, а затем отфильтровать по topic-1
стол будучи null
чтобы найти список недостающих строк.
-- example using 0.9 ksqlDB, assuming a INT primary key:
-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1', value_format='?');
-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1-copy', value_format='?');
-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;
Преимущество этого подхода в том, что MISSING
таблица отсутствующих строк будет автоматически обновляться: когда вы извлечете недостающие строки из исходного экземпляра DB2 и создадите их для topic-1
тогда строки в таблице "MISSING" будут удалены, т.е. вы увидите, что надгробия производятся в MISSING
тема.
Вы даже можете расширить этот подход, чтобы найти строки, существующие в topic-1
которых больше нет в исходной базе данных:
-- using the same DDL statements for TABLE_1 and TABLE_2 from above
-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;
-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;
-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;
Конечно, вам нужно будет соответствующим образом изменить размер кластера. Чем больше ваш кластер ksqlDB, тем быстрее он обработает данные. Для материализации стола также потребуется емкость диска.
Максимальный объем распараллеливания, который вы можете получить, зависит от количества разделов по темам. Если у вас всего 1 раздел, то данные будут обрабатываться последовательно. Если вы работаете со 100 разделами, вы можете обрабатывать данные, используя 100 ядер ЦП, при условии, что у вас запущено достаточно экземпляров ksqlDB. (По умолчанию каждый узел ksqlDB создает 4 потока обработки потока на запрос (хотя вы можете увеличить это значение, если на сервере больше ядер!)).