Как следует настроить этап, интенсивно использующий память, для параллельного выполнения через Apache Beam в Google Dataflow?
У меня есть конвейер потока данных, который обрабатывает некоторые спутниковые снимки. Этот конвейер работает должным образом с большим количеством небольших тестовых изображений, а также работает при обработке одного "большого" производственного изображения. Он не работает, когда конвейер просят обработать более одного "большого" изображения. Логика функций работает как локально, так и в потоке данных, поэтому проблема заключается не в синтаксисе / использовании, а в том, как правильно развернуть работу.
Псевдокод для участка конвейера, интенсивно использующего память, выглядит так:
List images in bucket -> Group images by date -> do stuff with images
Хотя конвейер для этого выглядит так:
(pipeline
| 'Get unpacked scenes' >> beam.Create(get_bucket_contents())
| 'Generate file names' >> beam.Map(create_scene_id)
| 'Group into global scenes' >> beam.GroupByKey()
| 'GenerateImg' >> (beam.ParDo(ImageGenerator(some_setup_args)))
)
Мой вопрос в том, как масштабировать "что-то делать с изображениями", также известное как
ImageGenerator
к рабочим узлам в потоке данных, чтобы задание не упало? Одна группа изображений, входные данные передаются в
ImageGenerator
, максимум составляет ~28 ГБ системной памяти. В
--machine-type
Аргумент указывает конвейеру использовать машину 32G highmem. Запуск конвейера с этими настройками для одной группы изображений на одной машине highmem32 работает нормально.
Запуск этого конвейера с двумя группами изображений на двух или трех машинах highmem32 каждый раз дает сбой. Зарегистрированные отказы выдают сообщение "Недостаточно памяти: завершить процесс XYZ". Если у меня есть два набора изображений, также называемых элементами, для обработки и несколько запущенных рабочих процессов, кажется, что поток данных назначает оба из них одному рабочему, которому затем не хватает памяти. Затем он перемещает элементы к доступным узлам, которые иногда успешны, но чаще всего терпят неудачу. Это неизбежно превышает предел в 4 сбоя на задание, и конвейер завершается.
Моя текущая рабочая теория о том, как заставить это работать, заключается в том, чтобы ограничить функцию parDo обработкой только одного набора изображений, одного элемента в коллекции pcollection, на любом отдельном рабочем узле. Я не уверен, возможно ли это. Я пробовал добавить шаг перестановки, чтобы распределить работу. Это не изменило ничего из того, что я мог сказать. Назначение рабочих узлов с большим объемом памяти - это нормально, но, похоже, создает ту же проблему при развертывании с реальными рабочими нагрузками.
Есть ли способ настроить рабочий узел для обработки только одного элемента в коллекции за раз?