Схема потока данных фабрики данных Azure в формате CSV перемещается в паркет статических столбцов назначения. Является ли это возможным?

Попытка написать поток данных фабрики данных Azure, который обрабатывал бы два похожих CSV-файла с версиями. Файл версии 1 имеет 48 столбцов. В файле версии 2 50 столбцов - те же 48 столбцов, что и в версии 1, но в конце добавлены 2 дополнительных столбца. Я хочу создать целевой файл паркета, содержащий все 50 столбцов, для загрузки в мой SQLDW через polybase. Исторически сложилось так, что у нас есть более 6 тысяч файлов в одном источнике больших двоичных объектов, и нет простого способа идентифицировать файлы с 48 столбцами против 50 столбцов. Ниже приведено самое близкое к решению решение.

  1. Исходный CSV-файл с включенным параметром "Разрешить смещение схемы". В наборе данных CSV не определена схема
  2. Производные столбцы MapDrifted - то есть toString(byName('Manufacturer')) все 50 столбцов
  3. Раковина - набор данных представляет собой паркет со схемой, определенной файлом шаблона паркета, который содержит все 50 столбцов. Раздел раковины задается именем исходного файла. Каждый входящий файл будет иметь на выходе файл parquet.

Это решение работает с набором из двух тестовых файлов. Один с 48 столбцами и один с 50 столбцами. Создаются два паркетных файла с 50 столбцами. Один файл заполняется до 48-го столбца, другой файл заполняется всеми 50 столбцами. Если я добавлю в тест больше исходных файлов с 48 столбцами. Файл с 50 столбцами теряет последние два столбца данных, и в нем остается только 48 столбцов? Я думал, что это обычная проблема, которую может решить ADF. т.е. версия файла со временем меняется. Какие-либо предложения? Ниже приведен сценарий моего ADF

source(allowSchemaDrift: true,
    validateSchema: false,
    rowUrlColumn: 'sourcefilename',
    inferDriftedColumnTypes: true,
    multiLineRow: true,
    wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
        SKU = toString(byName('SKU')),
        {Partner Name} = toString(byName('Partner Name')),
        {Partner Part Number} = toString(byName('Partner Part Number')),
        {Search Date} = toString(byName('Search Date')),
        {Search Result Description} = toString(byName('Search Result Description')),
        {1st Line Description} = toString(byName('1st Line Description')),
        {2nd Line Description} = toString(byName('2nd Line Description')),
        {Product Category} = toString(byName('Product Category')),
        {Product Category 1} = toString(byName('Product Category 1')),
        {Product Category 2} = toString(byName('Product Category 2')),
        {Product Category 3} = toString(byName('Product Category 3')),
        {Product Category 4} = toString(byName('Product Category 4')),
        {UNSPSC Code} = toString(byName('UNSPSC Code')),
        Pricing = toString(byName('Pricing')),
        Currency = toString(byName('Currency')),
        {Availability Qty} = toString(byName('Availability Qty')),
        {Availability Status} = toString(byName('Availability Status')),
        {Average Rating} = toString(byName('Average Rating')),
        {Total Reviews} = toString(byName('Total Reviews')),
        Brand = toString(byName('Brand')),
        Model = toString(byName('Model')),
        {Product Line} = toString(byName('Product Line')),
        {Partner Site} = toString(byName('Partner Site')),
        {Product URL} = toString(byName('Product URL')),
        Warranty = toString(byName('Warranty')),
        {Product Length} = toString(byName('Product Length')),
        {Product Width} = toString(byName('Product Width')),
        {Product Height} = toString(byName('Product Height')),
        {Product Depth} = toString(byName('Product Depth')),
        {Product Weight} = toString(byName('Product Weight')),
        {Fullfilling Partner} = toString(byName('Fullfilling Partner')),
        {Date First Available} = toString(byName('Date First Available')),
        {Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
        {Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
        {Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
        {Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
        {Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
        {Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
        {Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
        {Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
        {From the Manufacturer} = toString(byName('From the Manufacturer')),
        {Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
        {Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
        {Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
        {Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
        Endpoint = toString(byName('Endpoint')),
        {Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
        {Search SKU} = toString(byName('Search SKU')),
        {Search Manufacturer} = toString(byName('Search Manufacturer')),
        sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
        FileName as string,
        Manufacturer as string,
        SKU as string,
        PartnerName as string,
        PartnerPartNumber as string,
        SearchDate as string,
        SearchResultDescription as string,
        {1stLineDescription} as string,
        {2ndLineDescription} as string,
        ProductCategory as string,
        ProductCategory1 as string,
        ProductCategory2 as string,
        ProductCategory3 as string,
        ProductCategory4 as string,
        UNSPSCCode as string,
        Pricing as string,
        Currency as string,
        AvailabilityQty as string,
        AvailabilityStatus as string,
        AverageRating as string,
        TotalReviews as string,
        Brand as string,
        Model as string,
        ProductLine as string,
        PartnerSite as string,
        ProductURL as string,
        Warranty as string,
        ProductLength as string,
        ProductWidth as string,
        ProductHeight as string,
        ProductDepth as string,
        ProductWeight as string,
        FullfillingPartner as string,
        DateFirstAvailable as string,
        FrequentlyBoughtTogether1 as string,
        FrequentlyBoughtTogether1PartNumber as string,
        FrequentlyBoughtTogether2 as string,
        FrequentlyBoughtTogether2PartNumber as string,
        FrequentlyBoughtTogether3 as string,
        FrequentlyBoughtTogether3PartNumber as string,
        FrequentlyBoughtTogether4 as string,
        FrequentlyBoughtTogether4PartNumber as string,
        FromtheManufacturer as string,
        BestesellersRank1 as string,
        BestsellersRank2 as string,
        BestsellersRank3 as string,
        BestsellersRank4 as string,
        Endpoint as string,
        RelatedStarTechcomSKU as string,
        SearchSKU as string,
        SearchManufacturer as string
    ),
    allowSchemaDrift: false,
    validateSchema: false,
    format: 'parquet',
    rowUrlColumn:'sourcefilename',
    mapColumn(
        FileName = sourcefilename,
        Manufacturer,
        SKU,
        PartnerName = {Partner Name},
        PartnerPartNumber = {Partner Part Number},
        SearchDate = {Search Date},
        SearchResultDescription = {Search Result Description},
        {1stLineDescription} = {1st Line Description},
        {2ndLineDescription} = {2nd Line Description},
        ProductCategory = {Product Category},
        ProductCategory1 = {Product Category 1},
        ProductCategory2 = {Product Category 2},
        ProductCategory3 = {Product Category 3},
        ProductCategory4 = {Product Category 4},
        UNSPSCCode = {UNSPSC Code},
        Pricing,
        Currency,
        AvailabilityQty = {Availability Qty},
        AvailabilityStatus = {Availability Status},
        AverageRating = {Average Rating},
        TotalReviews = {Total Reviews},
        Brand,
        Model,
        ProductLine = {Product Line},
        PartnerSite = {Partner Site},
        ProductURL = {Product URL},
        Warranty,
        ProductLength = {Product Length},
        ProductWidth = {Product Width},
        ProductHeight = {Product Height},
        ProductDepth = {Product Depth},
        ProductWeight = {Product Weight},
        FullfillingPartner = {Fullfilling Partner},
        DateFirstAvailable = {Date First Available},
        FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
        FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
        FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
        FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
        FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
        FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
        FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
        FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
        FromtheManufacturer = {From the Manufacturer},
        BestesellersRank1 = {Bestesellers Rank 1},
        BestsellersRank2 = {Bestsellers Rank 2},
        BestsellersRank3 = {Bestsellers Rank 3},
        BestsellersRank4 = {Bestsellers Rank 4},
        Endpoint,
        RelatedStarTechcomSKU = {Related StarTech.com SKU},
        SearchSKU = {Search SKU},
        SearchManufacturer = {Search Manufacturer}
    )) ~> sink1

2 ответа

Вы всегда хотите выводить файл Parquet с одинаковой схемой? Т.е. 50 столбцов вне зависимости от схемы входящего файла?

Если да, то вы можете создать поток данных с "канонической моделью", которая определяет эту структуру из 50 столбцов.

Вы должны построить определение целевой схемы, используя производный столбец, и сопоставить туда входящие исходные столбцы. Если у вас нет подходящего столбца, вы можете просто установить значение NULL.

С помощью этого метода вам не нужно будет определять формат набора данных в приемнике. Вы можете просто использовать Auto Map с пустым набором данных и вывести файлы Parquet.

Схема вывода файла Parquet будет соответствовать вашей модели производного столбца, которая будет определять понятные псевдонимы, которые вы использовали в приведенном выше сопоставлении приемника.

Вот видео, которое я сделал, чтобы объяснить этот метод: https://www.youtube.com/watch?v=K5tgzLjEE9Q.

Я надеюсь, что это помогает.

Я собрал здесь очень быструю и очень простую демонстрацию: http://youtu.be/DhscTC6VwwI?hd=1.

Я добавил его в нашу коллекцию видеороликов о потоках данных ADF, потому что это общий шаблон, использующий дрейф схемы и гибкую обработку схемы.

Ключом к этой работе является построение вашей выходной модели внутри потока данных с производным столбцом. Держите свои наборы данных без схемы.

Дайте знать, если у вас появятся вопросы.

Другие вопросы по тегам