Как объединить базовые и множественные дельта-структурированные потоки в Azure DataLake
У меня есть основной поток, и у меня есть несколько дельта-потоков для разных последовательных дат. Я хочу объединить их, чтобы получить последний поток. Как я могу сделать это в Azure Datalake. Например, предположим, что это поток. Мне нужно объединить эти потоки, чтобы получить окончательный поток. Слияние заменит исходное значение новым значением. Количество дельта-потока на данный момент составляет более 100.
Базовый поток:
1022918 300.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 1000.00 1000.00 2 7 6 100
1022920 2000.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 300.00 300.00 2 162 9 100
Delta Stream 1:
1022918 400.00 300.00 2 7 5 100
1022919 2000.00 1000.00 2 7 6 100
1022920 3000.00 2000.00 2 170 6 100
1022922 400.00 300.00 2 162 9 100
Delta Stream 2:
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
Ожидаемый результат
1022918 400.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 400.00 300.00 2 162 9 100
2 ответа
Я разработал пример в U-SQL
используя ваши тестовые данные. Он использует подход, описанный выше, и получает последнюю запись на основе уникального ключа, описанного вами, и какого-то компонента данных.
Я использовал эти файлы:
Есть несколько предположений:
- файлы разделены пробелом
- имя файла (или может быть папкой) включает компонент даты
Код:
// Assumptions:
// col1, 5 and 6 is the intented unique key
// each file includes a date in the filename
DECLARE @baseFilesLocation string = "input/base_{filedate}.txt";
DECLARE @deltaFilesLocation string = "input/delta_{filedate}.txt";
// Get the base files
@baseFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @baseFilesLocation
USING Extractors.Text(delimiter : ' ');
// Get the delta files
@deltaFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @deltaFilesLocation
USING Extractors.Text(delimiter : ' ', silent: true);
@working =
SELECT *
FROM @baseFiles
UNION ALL
SELECT *
FROM @deltaFiles;
// Work out the (col1, 5 and 6) and max filedate combination
@maxDates =
SELECT col1, col5, col6, MAX(filedate) AS filedate
FROM @working
GROUP BY col1, col5, col6;
// Join the original set (all base data, all delta files) to the max dates rowset, to get last record for each col1
@output = SELECT w.*
FROM @working AS w
SEMIJOIN @maxDates AS d ON w.col1 == d.col1
AND w.col5 == d.col5
AND w.col6 == d.col6
AND w.filedate == d.filedate;
OUTPUT @output
TO "/output/output.csv"
ORDER BY col1, col2, col3
USING Outputters.Csv();
Мои результаты (которые соответствуют вашим ожидаемым результатам:
НТН
Azure Data Lake Store - это файловая система только для добавления. Это означает, что записи могут быть добавлены только в конец файла. Это мало чем отличается от многих других систем только для добавления. Различные аналитические приложения, такие как Azure Data Lake Analytics и Hive, могут использоваться для логического объединения этих базовых и дельта-потоков.
Обычно для этого есть 4 шага.
Загрузить базовые данные - это просто. При загрузке базовых данных вам необходимо добавить идентификатор. Например, базовая дата или базовая версия. Для этого обсуждения допустим, вы используете номер версии. Скажем, база данных версии 0
Загружать дельта-данные в их собственную таблицу / файл. При загрузке их также необходимо иметь идентификатор, который можно использовать для сравнения с базовыми данными для определения последних записей. Скажем, у нас также есть номер версии. Таким образом, новые записи будут больше версии № 1, 2, 3 и так далее.
Построить представление слияния - это запрос, который объединяет базовые данные с дельта-данными по идентификатору, где версия является наибольшей. Теперь, когда вы получите это представление, это все последние записи.
Создайте новые базовые данные с записями из представления слияния.
Вот статья, которая объясняет, как вы можете использовать Hive для достижения этой цели. Этот использует дату и время, чтобы идентифицировать последние записи.
https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/