beam.Create() со списком dicts работает очень медленно по сравнению со списком строк

Я использую Dataflow для обработки шейп-файла с примерно 4 миллионами функций (всего около 2 ГБ) и загрузки геометрии в BigQuery, поэтому перед запуском конвейера я извлекаю функции шейп-файла в список и инициализирую конвейер, используя. Есть два способа создать исходный список функций:

  1. Экспортируйте каждую функцию как строку json, которая затем DoFns нужно будет разобрать в dict:
      features = [f.ExportToJson() for f in layer]
  1. Экспорт python dict, предварительно проанализированного из строки JSON
      features = [json.loads(f.ExportToJson()) for f in layer]

При использовании варианта 1 процесс продолжается примерно через минуту. Используя вариант 2, beam.Create(features) занимает около 3+ часов на 6-ядерном i7 и, кажется, проводит здесь много времени:

        File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>
    typehints.Union[[instance_to_type(v) for k, v in o.items()]],
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in instance_to_type
    typehints.Union[[instance_to_type(v) for k, v in o.items()]],
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/typehints/trivial_inference.py", line 88, in <listcomp>

Это trivial_inferenceчто тормозит при переходе в список диктовок? Могу ли я настроить beam.Create не делать то, что он пытается там сделать, или как-то иначе ускорить это, чтобы список dicts не был в 100 раз медленнее по сравнению со списком строк?

1 ответ

очень интересный результат!

Я предполагаю, что это происходит потому, что Createнеобходимо собрать все данные, которые он получает. Консервированный размер словарей может быть большим, потому что они маринованы как объекты Python, а строки - как строки Python.

Вы могли сделать:

      p
| beam.Create([f.ExportToJson() for f in layer])
| beam.Map(json.loads)

Чтобы избежать лишнего травления. Это поможет?

Другие вопросы по тегам