Как сделать ReadAllFromText не блочным конвейером?

Я хотел бы реализовать очень простой лучевой конвейер:

read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.

Apache Beam имеет предварительно реализованный PTransform для каждого процесса.

Таким образом, трубопровод будет:

Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")

Тем не мение, ReadAllFromText() как-то блокирует конвейер. Создание пользовательского PTransform, который возвращает случайную строку после чтения из PubSub и записи ее в таблицу BigQuery, работает нормально (без блокировки). Добавление фиксированного окна в 3 секунды или запуск каждого элемента также не решает проблему.

Каждый файл занимает около 10 МБ и 23K строк.

К сожалению, я не могу найти документацию о том, как ReadAllFromText должен работать. Было бы очень странно, если бы он попытался заблокировать конвейер, пока не прочитает все файлы. И я ожидаю, что функция будет выдвигать каждую строку в конвейер, как только она прочитает строку.

Есть ли известная причина такого поведения? Это ошибка или я что-то не так делаю?

Код трубопровода:

pipeline_options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
                | 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
                | ReadAllFromText(skip_header_lines=1)

            elements = lines | beam.ParDo(SplitPayload())

            elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
    .
    .
    .
    class SplitPayload(beam.DoFn):
        def process(self, element, *args, **kwargs):

            timestamp, id, payload = element.split(";")

            return [{
                'timestamp': timestamp,
                'id': id,
                'payload': payload
            }]

0 ответов

Другие вопросы по тегам