Динамические конвейеры Azure Data Factory v2
Итак, у нас есть фабрика с ~400 наборами данных и ~200 конвейерами, и она становится громоздкой. Сосредоточение на копировании из исходного кода SQL в приемник BLOB-объектов. Поскольку мы копируем в BLOB-объект, схема не оказывает влияния. Я хотел бы иметь один набор данных для каждого источника, один набор данных для каждой учетной записи BLOB-объекта и один конвейер для каждой комбинации учетной записи источника / BLOB-объекта, динамически передавая ему конфигурацию из поиска.
Мы успешно разработали конвейер, который использует фиктивные наборы данных для источника и приемника. Это работает, если вы передаете ему запрос, имя контейнера и имя папки.
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "DynamicCopy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select 1 a"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": "raw-test",
"folder": "test"
}
}
]
}
]
}
}
Когда мы помещаем поиск перед ним и оборачиваем его в foreach, он перестает работать. С не очень полезно
"errorCode": "400", "message": "Activity failed because an inner activity failed", "failureType": "UserError", "target": "ForEach"
Хранимая процедура поиска [dbo].[adfdynamic]
фактически не упоминается в foreach еще:
create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b'
UNION ALL
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d'
Итак, что я желал:
- один блоб в raw-test @.. myblob... / test_a / out.dsv с контентом
{'a,b','1,2'}
- один блоб в raw-test @.. myblob... / test_b / out.dsv с контентом
{'c,d','3,2'}
набор данных sql:
{
"name": "AzureSql",
"properties": {
"linkedServiceName": {
"referenceName": "Dest",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "CustomerKey",
"type": "Int32"
},
{
"name": "Name",
"type": "String"
}
],
"typeProperties": {
"tableName": "[dbo].[DimCustomer]"
}
}
}
набор данных BLOB-объектов:
{
"name": "AzureBlob",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"container": {
"type": "String"
},
"folder": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"treatEmptyAsNull": false,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": {
"value": "@{dataset().folder}/out.dsv",
"type": "Expression"
},
"folderPath": {
"value": "@dataset().container",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
и нерабочий динамический конвейер:
{
"name": "Copy",
"properties": {
"activities": [
{
"name": "ForEach",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select 1 a, 2 b from dest",
"type": "Expression"
}
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "raw-test",
"type": "Expression"
},
"folder": {
"value": "folder",
"type": "Expression"
}
}
}
]
}
]
}
},
{
"name": "Lookup",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
}
}
]
}
}
Извиняюсь за форматирование. слишком много кода в одном сообщении?
2 ответа
- В поиске активности, пожалуйста, проверьте, является ли ваше свойство firstRowOnly. Это ложь или правда? По умолчанию это правда.
- В пользовательском интерфейсе вы можете установить точку останова для отладки вашей активности поиска. Тогда вы можете увидеть, является ли вывод тем, что вы хотите.
Не совсем ответ на ваш вопрос, но я упростила жизнь, создав набор данных под названием GenericBlob. Это было 2 параметра контейнера и путь. Это может помочь упростить то, что вы делаете. У меня тоже было 20 наборов данных BLOB-объектов, теперь у меня есть один... (предполагается, что BLOB-объекты находятся в одной учетной записи хранения).