Apache Beam: ReadFromText против ReadAllFromText
Я запускаю конвейер Apache Beam, считывающий текстовые файлы из Google Cloud Storage, выполняю некоторый анализ этих файлов и записываю проанализированные данные в Bigquery.
Игнорируя синтаксический анализ и google_cloud_options здесь ради краткости, мой код выглядит следующим образом: (apache-beam 2.5.0 с надстройками GCP и Dataflow в качестве бегуна)
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
Это работает нормально и успешно добавляет соответствующие данные в мою таблицу Bigquery для небольшого количества входных файлов. Однако, когда я увеличиваю количество входных файлов до +- 800k, я получаю сообщение об ошибке:
"Общий размер объектов BoundedSource, возвращаемых операцией BoundedSource.split(), превышает допустимый предел".
Я обнаружил устранение ошибок импорта конвейера луча apache [объекты BoundedSource превышают допустимый предел], который рекомендует использовать ReadAllFromText вместо ReadFromText.
Однако, когда я выменяюсь, я получаю следующую ошибку:
Traceback (most recent call last):
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
xmltobigquery.run_dataflow()
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
result = p.apply(self, pvalueish, label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
return pvalue | 'ReadAllFiles' >> self._read_all_files
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
| 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
| 'RemoveRandomKeys' >> Map(lambda t: t[1]))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
windowing_saved = pcoll.windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'.
Какие-либо предложения?
2 ответа
Я столкнулся с той же проблемой чтения файлов Avro. Как отметил Ричардт beam.Create
должен быть вызван явно. Следующий шаблон должен работать для обоих, ReadAllFromAvro
а также ReadAllFromText
:
class AvroTransformOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
help='Path of the avro file to read from')
parser.add_value_provider_argument(
'--output',
help='Output avro file to write results to')
def run():
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
options = pipeline_options.view_as(AvroTransformOptions)
steps = (
pipeline
| 'Create' >> beam.Create(['%s*' % options.input])
| 'ReadData' >> beam.io.ReadAllFromAvro()
| 'Transaform' >> beam.ParDo(YourDoFn())
| 'WriteData' >> beam.io.WriteToAvro(options.output, TARGET_SCHEMA))
Надеюсь, этот ответ полезен.
Чтобы ответить на исходный вопрос:ReadFromText
принимает аргумент шаблона файла, в то время какReadAllFromText
принимает свои шаблоны файлов в качестве входных данных конвейера:
# ReadFromText
(p
| beam.io.ReadFromText("myfile.csv"))
# ReadAllFromText
(p
| beam.Create(["myfile1.csv", "myfile2.csv", "myfile3.csv"])
| beam.io.ReadAllFromText())