Azure Data Factory v2: не удается создать операцию копирования с помощью кода C#
Проблема в том, что моя внутренняя копия из foreach не публикуется в ADF. Я не вижу этой активности в интерфейсе ADF и даже после того, как добавляю ее вручную, я не могу не отлаживать, публиковать и даже просматривать код активности =(
Когда я нажимаю "Опубликовать", я вижу ошибку "Публикация: ошибка при публикации". При попытке отладки:
Failed
{"__zone_symbol__currentTask":{"type":"microTask","state":"notScheduled","source":"Promise.then","zone":"angular","cancelFn":null,"runCount":0}}
Нажатие на кнопку "Код" не выполняет никаких действий.
Что-то мне не хватает? Должен ли я добавить копирование деятельности в другом месте?
Работа с foreach / copy в коде C# выглядит следующим образом:
private Activity CreateDocumentIteratorActivity(
string lookupDocumentsActivityName,
string sourceDatasetName,
string destinationDatasetName,
string logsServiceName)
{
return new ForEachActivity()
{
Name = "ForEachDocument",
IsSequential = false,
BatchCount = BackupSettings.Value.ParallelThreads,
Items = new Expression { Value = "@activity('" + lookupDocumentsActivityName + "').output.value" },
DependsOn = new List<ActivityDependency>() {
new ActivityDependency() {
Activity = lookupDocumentsActivityName,
DependencyConditions = new List<string>() {
DependencyCondition.Succeeded
}
}
},
Activities = new[] {
CreateCopyDocToFileActivity(sourceDatasetName, destinationDatasetName, logsServiceName, BackupSettings.Value.BackupLogsFolder)
}
};
}
private CopyActivity CreateCopyDocToFileActivity(string sourceDatasetName, string destinationDatasetName, string logsServiceName, string logsPath)
{
return new CopyActivity()
{
Name = "CopyDocToFile",
Policy = new ActivityPolicy()
{
Retry = BackupSettings.Value.RetryPolicyConfiguration.RetryCount,
RetryIntervalInSeconds = (int)BackupSettings.Value.RetryPolicyConfiguration.DeltaBackOff.TotalSeconds,
Timeout = BackupSettings.Value.RetryPolicyConfiguration.MaxBackOff
},
Source = new DocumentDbCollectionSource()
{
Query = new Expression(@"select value c from c where c.id = '@{item().id}'")
},
Inputs = new[] { new DatasetReference() {
ReferenceName = sourceDatasetName,
Parameters = new Dictionary<string, object>
{
{ "collectionName", new Expression("@pipeline().parameters.collectionName") }
}
}},
Outputs = new[] { new DatasetReference() {
ReferenceName = destinationDatasetName,
Parameters = new Dictionary<string, object> {
{ "fileName", new Expression("@concat(pipeline().parameters.collectionName, '/', item().PartitionKey, '/', item().id)") },
{ "backupDateStr", new Expression("@pipeline().TriggerTime") }
}
}},
Sink = new BlobSink()
{
CopyBehavior = CopyBehaviorType.PreserveHierarchy,
},
ParallelCopies = BackupSettings.Value.ParallelThreads,
EnableSkipIncompatibleRow = true,
RedirectIncompatibleRowSettings = new RedirectIncompatibleRowSettings()
{
LinkedServiceName = logsServiceName,
Path = logsPath
}
};
}
Ответ от / трубопровода API-вызова:
{
"value": [
{
"id": "/subscriptions/d2259601-012b-4253-895b-02916ef0f7f7/resourceGroups/Test-Data/providers/Microsoft.DataFactory/factories/backup-data-factory/pipelines/cosmosBackup",
"name": "cosmosBackup",
"type": "Microsoft.DataFactory/factories/pipelines",
"properties":
{
"activities": [
{
"type": "Lookup",
"typeProperties":
{
"source":
{
"type": "DocumentDbCollectionSource",
"query":
{
"value": "select root.id, root.PartitionKey from root",
"type": "Expression"
}
},
"dataset":
{
"referenceName": "source_cosmosdb_collection",
"parameters":
{
"collectionName": "@pipeline().parameters.collectionName"
},
"type": "DatasetReference"
},
"firstRowOnly": false
},
"policy":
{
"timeout": "02:00:00",
"retry": 3,
"retryIntervalInSeconds": 30
},
"name": "GetDocumentsIds"
},
{
"type": "ForEach",
"typeProperties":
{
"isSequential": false,
"batchCount": 4,
"items":
{
"value": "@activity('GetDocumentsIds').output.value",
"type": "Expression"
},
"activities": [
{
"type": "Copy",
"typeProperties":
{
"source":
{
"type": "DocumentDbCollectionSource",
"query":
{
"value": "select value c from c where c.id = '@{item().id}'",
"type": "Expression"
}
},
"sink":
{
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
},
"parallelCopies": 4,
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings":
{
"linkedServiceName": "destination_cosmosdb_collection_service",
"path": "backup/logs"
}
},
"inputs": [
{
"referenceName": "source_cosmosdb_collection",
"parameters":
{
"collectionName":
{
"value": "@pipeline().parameters.collectionName",
"type": "Expression"
}
},
"type": "DatasetReference"
}],
"outputs": [
{
"referenceName": "destination_cosmosdb_collection",
"parameters":
{
"fileName":
{
"value": "@concat(pipeline().parameters.collectionName, '/', item().PartitionKey, '/', item().id)",
"type": "Expression"
},
"backupDateStr":
{
"value": "@pipeline().TriggerTime",
"type": "Expression"
}
},
"type": "DatasetReference"
}],
"policy":
{
"timeout": "02:00:00",
"retry": 3,
"retryIntervalInSeconds": 30
},
"name": "CopyDocToFile"
}]
},
"name": "ForEachDocument",
"dependsOn": [
{
"activity": "GetDocumentsIds",
"dependencyConditions": ["Succeeded"]
}]
}],
"parameters":
{
"collectionName":
{
"type": "String"
}
}
},
"etag": "01006ed4-0000-0000-0000-5b3b38450000"
}]
}
1 ответ
Решение
Выяснил проблему. Эта часть не права.
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings": {
"linkedServiceName": "destination_cosmosdb_collection_service",
"path": "backup/logs"
}
Должно быть
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings": {
"linkedServiceName": {
"referenceName": "destination_cosmosdb_collection_service",
"type": "LinkedServiceReference"
},
"path": "backup/logs"
},