Как объединить базовые и множественные дельта-структурированные потоки в Azure DataLake

У меня есть основной поток, и у меня есть несколько дельта-потоков для разных последовательных дат. Я хочу объединить их, чтобы получить последний поток. Как я могу сделать это в Azure Datalake. Например, предположим, что это поток. Мне нужно объединить эти потоки, чтобы получить окончательный поток. Слияние заменит исходное значение новым значением. Количество дельта-потока на данный момент составляет более 100.

Базовый поток:

1022918 300.00  300.00  2   7   5   100
1022918 400.00  400.00  2   170 5   100
1022919 1000.00 1000.00 2   7   6   100
1022920 2000.00 2000.00 2   170 6   100
1022921 3000.00 3000.00 2   123 7   100
1022922 100.00  100.00  2   162 7   100
1022922 200.00  200.00  2   123 9   100
1022922 300.00  300.00  2   162 9   100

Delta Stream 1:

1022918 400.00  300.00  2   7   5   100
1022919 2000.00 1000.00 2   7   6   100
1022920 3000.00 2000.00 2   170 6   100
1022922 400.00  300.00  2   162 9   100

Delta Stream 2:

1022919 2500.00 1000.00 2   7   6   100
1022920 3500.00 2000.00 2   170 6   100

Ожидаемый результат

1022918 400.00  300.00  2   7   5   100
1022918 400.00  400.00  2   170 5   100
1022919 2500.00 1000.00 2   7   6   100
1022920 3500.00 2000.00 2   170 6   100
1022921 3000.00 3000.00 2   123 7   100
1022922 100.00  100.00  2   162 7   100
1022922 200.00  200.00  2   123 9   100
1022922 400.00  300.00  2   162 9   100

2 ответа

Решение

Я разработал пример в U-SQL используя ваши тестовые данные. Он использует подход, описанный выше, и получает последнюю запись на основе уникального ключа, описанного вами, и какого-то компонента данных.

Я использовал эти файлы:

Есть несколько предположений:

  • файлы разделены пробелом
  • имя файла (или может быть папкой) включает компонент даты

Код:

// Assumptions: 
//      col1, 5 and 6 is the intented unique key
//      each file includes a date in the filename

DECLARE @baseFilesLocation string = "input/base_{filedate}.txt";
DECLARE @deltaFilesLocation string = "input/delta_{filedate}.txt";


// Get the base files
@baseFiles =
    EXTRACT col1 int,
            col2 decimal,
            col3 decimal,
            col4 int,
            col5 int,
            col6 int,
            col7 int,
            filedate string

    FROM @baseFilesLocation
    USING Extractors.Text(delimiter : ' ');


// Get the delta files
@deltaFiles =
    EXTRACT col1 int,
            col2 decimal,
            col3 decimal,
            col4 int,
            col5 int,
            col6 int,
            col7 int,
            filedate string

    FROM @deltaFilesLocation
    USING Extractors.Text(delimiter : ' ', silent: true);


@working = 
    SELECT *
    FROM @baseFiles
    UNION ALL
    SELECT *
    FROM @deltaFiles;


// Work out the (col1, 5 and 6) and max filedate combination
@maxDates = 
    SELECT col1, col5, col6, MAX(filedate) AS filedate
    FROM @working
    GROUP BY col1, col5, col6;


// Join the original set (all base data, all delta files) to the max dates rowset, to get last record for each col1
@output = SELECT w.*
    FROM @working AS w
        SEMIJOIN @maxDates AS d ON w.col1 == d.col1
            AND w.col5 == d.col5
            AND w.col6 == d.col6
            AND w.filedate == d.filedate;


OUTPUT @output
    TO "/output/output.csv"
ORDER BY col1, col2, col3
USING Outputters.Csv();

Мои результаты (которые соответствуют вашим ожидаемым результатам:

Мои результаты U-SQL

НТН

Azure Data Lake Store - это файловая система только для добавления. Это означает, что записи могут быть добавлены только в конец файла. Это мало чем отличается от многих других систем только для добавления. Различные аналитические приложения, такие как Azure Data Lake Analytics и Hive, могут использоваться для логического объединения этих базовых и дельта-потоков.

Обычно для этого есть 4 шага.

  1. Загрузить базовые данные - это просто. При загрузке базовых данных вам необходимо добавить идентификатор. Например, базовая дата или базовая версия. Для этого обсуждения допустим, вы используете номер версии. Скажем, база данных версии 0

  2. Загружать дельта-данные в их собственную таблицу / файл. При загрузке их также необходимо иметь идентификатор, который можно использовать для сравнения с базовыми данными для определения последних записей. Скажем, у нас также есть номер версии. Таким образом, новые записи будут больше версии № 1, 2, 3 и так далее.

  3. Построить представление слияния - это запрос, который объединяет базовые данные с дельта-данными по идентификатору, где версия является наибольшей. Теперь, когда вы получите это представление, это все последние записи.

  4. Создайте новые базовые данные с записями из представления слияния.

Вот статья, которая объясняет, как вы можете использовать Hive для достижения этой цели. Этот использует дату и время, чтобы идентифицировать последние записи.

https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/

Другие вопросы по тегам