Параметры экспортированного шаблона потока данных неизвестны

Я экспортировал шаблон Cloud Dataflow из Dataprep, как показано здесь:

https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556

В Dataprep поток извлекает текстовые файлы через подстановочный знак из Google Cloud Storage, преобразует данные и добавляет их в существующую таблицу BigQuery. Все работает как задумано.

Однако, пытаясь запустить задание потока данных из экспортированного шаблона, я не могу правильно определить параметры запуска. Сообщения об ошибках не слишком специфичны, но ясно, что, с одной стороны, я неправильно понимаю расположение (ввод и вывод).

Единственный предоставленный Google шаблон для этого варианта использования (находится по адресу https://cloud.google.com/dataflow/docs/guides/templates/provided-templates) не применяется как он использует UDF и также работает в пакетном режиме, перезаписывая любую существующую таблицу BigQuery, а не добавляя.

Изучение исходных данных задания потока данных из Dataprep показывает ряд параметров (найденных в файле метаданных), но я не смог заставить их работать в моем коде. Вот пример одной такой неудачной конфигурации:

import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def dummy(event, context):
    pass

def process_data(event, context):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    data = event
    gsclient = storage.Client()
    file_name = data['name']

    time_stamp = time.time()

    GCSPATH="gs://[path to template]
    BODY = {
        "jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
        "parameters": {
            "inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
            "outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
            "customGcsTempLocation": "gs://[my bucket]/dataflow"
         },
         "environment": {
            "zone": "us-east1-b"
         }
    }

    print(BODY["parameters"])

    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()

    print(response)

Приведенный выше пример указывает на недопустимое поле ("location1", которое я извлек из завершенного задания Dataflow. Я знаю, что мне нужно указать местоположение GCS, расположение шаблона и таблицу BigQuery, но нигде не нашел правильный синтаксис. Как уже упоминалось выше я нашел имена полей и примеры значений в сгенерированном файле метаданных задания.

Я понимаю, что этот конкретный вариант использования может не звонить ни в какие звонки, но в целом, если кто-то успешно определил и использовал правильные параметры запуска для задания потока данных, экспортированного из Dataprep, я был бы очень признателен за дополнительную информацию об этом. Спасибо.

0 ответов

Я думаю, что вам нужно просмотреть этот документ, он объясняет точно синтаксис, необходимый для передачи различных доступных параметров конвейера, включая необходимые параметры местоположения... 1

В частности, с вашим фрагментом кода следующий синтаксис не соответствует правильному синтаксису

""inputLocations": '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name)" В дополнение к document1, вы также должны просмотреть доступные параметры конвейера и их правильный синтаксис 2

Пожалуйста, используйте ссылки... Это официальные ссылки на документацию от Google. Эти ссылки никогда не устареют и не будут удалены, они активно контролируются и поддерживаются специальной командой

Другие вопросы по тегам