Apache Beam: ReadFromText против ReadAllFromText

Я запускаю конвейер Apache Beam, считывающий текстовые файлы из Google Cloud Storage, выполняю некоторый анализ этих файлов и записываю проанализированные данные в Bigquery.

Игнорируя синтаксический анализ и google_cloud_options здесь ради краткости, мой код выглядит следующим образом: (apache-beam 2.5.0 с надстройками GCP и Dataflow в качестве бегуна)

p = Pipeline(options=options)

lines = p | 'read from file' >> 
beam.io.ReadFromText('some_gcs_bucket_path*')  |  \
    'parse xml to dict' >> beam.ParDo(
        beam.io.WriteToBigQuery(
            'my_table',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    p.run()

Это работает нормально и успешно добавляет соответствующие данные в мою таблицу Bigquery для небольшого количества входных файлов. Однако, когда я увеличиваю количество входных файлов до +- 800k, я получаю сообщение об ошибке:

"Общий размер объектов BoundedSource, возвращаемых операцией BoundedSource.split(), превышает допустимый предел".

Я обнаружил устранение ошибок импорта конвейера луча apache [объекты BoundedSource превышают допустимый предел], который рекомендует использовать ReadAllFromText вместо ReadFromText.
Однако, когда я выменяюсь, я получаю следующую ошибку:

Traceback (most recent call last):
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
    xmltobigquery.run_dataflow()
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
    'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
    return pvalue | 'ReadAllFiles' >> self._read_all_files
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
    | 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
    | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
    windowing_saved = pcoll.windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'. 

Какие-либо предложения?

2 ответа

Я столкнулся с той же проблемой чтения файлов Avro. Как отметил Ричардт beam.Create должен быть вызван явно. Следующий шаблон должен работать для обоих, ReadAllFromAvro а также ReadAllFromText:

class AvroTransformOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            help='Path of the avro file to read from')
        parser.add_value_provider_argument(
            '--output',
            help='Output avro file to write results to')

def run():
    pipeline_options = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_options)
    options = pipeline_options.view_as(AvroTransformOptions)

    steps = (
        pipeline
        | 'Create' >> beam.Create(['%s*' % options.input])
        | 'ReadData' >> beam.io.ReadAllFromAvro()
        | 'Transaform' >> beam.ParDo(YourDoFn())
        | 'WriteData' >> beam.io.WriteToAvro(options.output, TARGET_SCHEMA))

Надеюсь, этот ответ полезен.

Чтобы ответить на исходный вопрос:ReadFromTextпринимает аргумент шаблона файла, в то время какReadAllFromTextпринимает свои шаблоны файлов в качестве входных данных конвейера:

      # ReadFromText
(p
 | beam.io.ReadFromText("myfile.csv"))

# ReadAllFromText
(p
 | beam.Create(["myfile1.csv", "myfile2.csv", "myfile3.csv"])
 | beam.io.ReadAllFromText())
Другие вопросы по тегам