Параметры экспортированного шаблона потока данных неизвестны
Я экспортировал шаблон Cloud Dataflow из Dataprep, как показано здесь:
https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556
В Dataprep поток извлекает текстовые файлы через подстановочный знак из Google Cloud Storage, преобразует данные и добавляет их в существующую таблицу BigQuery. Все работает как задумано.
Однако, пытаясь запустить задание потока данных из экспортированного шаблона, я не могу правильно определить параметры запуска. Сообщения об ошибках не слишком специфичны, но ясно, что, с одной стороны, я неправильно понимаю расположение (ввод и вывод).
Единственный предоставленный Google шаблон для этого варианта использования (находится по адресу https://cloud.google.com/dataflow/docs/guides/templates/provided-templates) не применяется как он использует UDF и также работает в пакетном режиме, перезаписывая любую существующую таблицу BigQuery, а не добавляя.
Изучение исходных данных задания потока данных из Dataprep показывает ряд параметров (найденных в файле метаданных), но я не смог заставить их работать в моем коде. Вот пример одной такой неудачной конфигурации:
import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def dummy(event, context):
pass
def process_data(event, context):
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
data = event
gsclient = storage.Client()
file_name = data['name']
time_stamp = time.time()
GCSPATH="gs://[path to template]
BODY = {
"jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
"parameters": {
"inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
"outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
"customGcsTempLocation": "gs://[my bucket]/dataflow"
},
"environment": {
"zone": "us-east1-b"
}
}
print(BODY["parameters"])
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
print(response)
Приведенный выше пример указывает на недопустимое поле ("location1", которое я извлек из завершенного задания Dataflow. Я знаю, что мне нужно указать местоположение GCS, расположение шаблона и таблицу BigQuery, но нигде не нашел правильный синтаксис. Как уже упоминалось выше я нашел имена полей и примеры значений в сгенерированном файле метаданных задания.
Я понимаю, что этот конкретный вариант использования может не звонить ни в какие звонки, но в целом, если кто-то успешно определил и использовал правильные параметры запуска для задания потока данных, экспортированного из Dataprep, я был бы очень признателен за дополнительную информацию об этом. Спасибо.
0 ответов
Я думаю, что вам нужно просмотреть этот документ, он объясняет точно синтаксис, необходимый для передачи различных доступных параметров конвейера, включая необходимые параметры местоположения... 1
В частности, с вашим фрагментом кода следующий синтаксис не соответствует правильному синтаксису
""inputLocations": '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name)" В дополнение к document1, вы также должны просмотреть доступные параметры конвейера и их правильный синтаксис 2
Пожалуйста, используйте ссылки... Это официальные ссылки на документацию от Google. Эти ссылки никогда не устареют и не будут удалены, они активно контролируются и поддерживаются специальной командой