Динамические конвейеры Azure Data Factory v2

Итак, у нас есть фабрика с ~400 наборами данных и ~200 конвейерами, и она становится громоздкой. Сосредоточение на копировании из исходного кода SQL в приемник BLOB-объектов. Поскольку мы копируем в BLOB-объект, схема не оказывает влияния. Я хотел бы иметь один набор данных для каждого источника, один набор данных для каждой учетной записи BLOB-объекта и один конвейер для каждой комбинации учетной записи источника / BLOB-объекта, динамически передавая ему конфигурацию из поиска.

Мы успешно разработали конвейер, который использует фиктивные наборы данных для источника и приемника. Это работает, если вы передаете ему запрос, имя контейнера и имя папки.

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "DynamicCopy",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select 1 a"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "dataIntegrationUnits": 0
                },
                "inputs": [
                    {
                        "referenceName": "AzureSql",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob",
                        "type": "DatasetReference",
                        "parameters": {
                            "container": "raw-test",
                            "folder": "test"
                        }
                    }
                ]
            }
        ]
    }
}

Когда мы помещаем поиск перед ним и оборачиваем его в foreach, он перестает работать. С не очень полезно

"errorCode": "400",
"message": "Activity failed because an inner activity failed",
"failureType": "UserError",
"target": "ForEach"

Хранимая процедура поиска [dbo].[adfdynamic] фактически не упоминается в foreach еще:

create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b' 
UNION ALL    
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d' 

Итак, что я желал:

  • один блоб в raw-test @.. myblob... / test_a / out.dsv с контентом {'a,b','1,2'}
  • один блоб в raw-test @.. myblob... / test_b / out.dsv с контентом {'c,d','3,2'}

набор данных sql:

{
    "name": "AzureSql",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest",
            "type": "LinkedServiceReference"
        },
        "type": "AzureSqlTable",
        "structure": [
            {
                "name": "CustomerKey",
                "type": "Int32"
            },
            {
                "name": "Name",
                "type": "String"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[DimCustomer]"
        }
    }
}

набор данных BLOB-объектов:

{
    "name": "AzureBlob",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureStorage1",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "container": {
                "type": "String"
            },
            "folder": {
                "type": "String"
            }
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ",",
                "treatEmptyAsNull": false,
                "skipLineCount": 0,
                "firstRowAsHeader": false
            },
            "fileName": {
                "value": "@{dataset().folder}/out.dsv",
                "type": "Expression"
            },
            "folderPath": {
                "value": "@dataset().container",
                "type": "Expression"
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

и нерабочий динамический конвейер:

{
    "name": "Copy",
    "properties": {
        "activities": [
            {
                "name": "ForEach",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Lookup').output.value",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Copy",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "select 1 a, 2 b from dest",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "BlobSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "AzureSql",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "AzureBlob",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "container": {
                                            "value": "raw-test",
                                            "type": "Expression"
                                        },
                                        "folder": {
                                            "value": "folder",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            },
            {
                "name": "Lookup",
                "type": "Lookup",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                }
            }
        ]
    }
}

Извиняюсь за форматирование. слишком много кода в одном сообщении?

2 ответа

  1. В поиске активности, пожалуйста, проверьте, является ли ваше свойство firstRowOnly. Это ложь или правда? По умолчанию это правда.
  2. В пользовательском интерфейсе вы можете установить точку останова для отладки вашей активности поиска. Тогда вы можете увидеть, является ли вывод тем, что вы хотите.

Не совсем ответ на ваш вопрос, но я упростила жизнь, создав набор данных под названием GenericBlob. Это было 2 параметра контейнера и путь. Это может помочь упростить то, что вы делаете. У меня тоже было 20 наборов данных BLOB-объектов, теперь у меня есть один... (предполагается, что BLOB-объекты находятся в одной учетной записи хранения).

Другие вопросы по тегам