Python API для запуска шаблона неизвестное имя не может найти поле
Я создал и запустил задание DataPrep и пытаюсь использовать шаблон из python в движке приложения. Я могу успешно начать работу, используя
gcloud dataflow jobs run
--parameters "inputLocations={\"location1\":\"gs://bucket/folder/*\"},
outputLocations={\"location1\":\"project:dataset.table\"},
customGcsTempLocation=gs://bucket/DataPrep-beta/temp"
--gcs-location gs://bucket/DataPrep-beta/temp/cloud-dataprep-templatename_template
Однако пытаясь использовать Python на движке приложения;
service = build('dataflow', 'v1b3', credentials=credentials)
input1 = {"location1": "{i1}".format(i1=input)}
output1 = {"location1": "{o1}".format(o1=output)}
print('input location: {}'.format(input1))
GCSPATH="gs://{bucket}/{template}".format(bucket=BUCKET, template=template)
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputLocations": input1,
"outputLocations": output1,
"customGcsTempLocation": "gs://{}/DataPrep-beta/temp".format(BUCKET)
}
}
print("dataflow request body: {}".format(BODY))
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
Я вернусь;
"Invalid JSON payload received. Unknown name "location1" at
'launch_parameters.parameters[1].value': Cannot find field.
Invalid JSON payload received. Unknown name "location1" at
'launch_parameters.parameters[2].value': Cannot find field."
Ничто из того, что я пробовал, похоже, не поддерживает передачу dict или json.dumps() или str() в "inputLocations" или "outputLocations".
2 ответа
Проблема с форматом, который вы передаете input1
а также output1
, Они должны быть в кавычках:
input1 = '{"location1":"' + input + '" }'
output1 = '{"location1":"' + output + '" }'
Я попытался отправить запрос с тем же подходом, что и вы, и это не удалось. Он также завершится неудачно, если я позже проанализирую его обратно в строку или json, потому что он не анализирует кавычки правильно.
Конечно, формат как-то связан с вашей проблемой. У меня был тот же вариант использования для решения, но на выходе были бы файлы, а не набор данных Google BigQuery. и для меня код со следующим параметром BODY инициирует конвейер потока данных Google:
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputLocations" : "{{\"location1\":\"gs://{bucket}/employee/input/patient.json\"}}".format(bucket=BUCKET),
"outputLocations": "{{\"location1\":\"gs://{bucket}/employee/employees.json/file\",\"location2\":\"gs://{bucket}/jobrun/employees_314804/.profiler/profilerTypeCheckHistograms.json/file\",\"location3\":\"gs://{bucket}/jobrun/employees_314804/.profiler/profilerValidValueHistograms.json/file\"}}".format(bucket=BUCKET)
},
"environment": {
"tempLocation": "gs://{bucket}/employee/temp".format(bucket=BUCKET),
"zone": "us-central1-f"
}
}