Конвейер Apache Beam принимает «Большой» входной файл (более 1 ГБ) не создает никаких выходных файлов
Я совершенно новичок в модели вычислений потока данных, и я делаю POC, чтобы проверить несколько концепций, использующих луч apache с прямым бегуном (и java sdk). У меня возникли проблемы с созданием конвейера, который считывает "большой" файл csv (около 1,25 ГБ) и выгружает его в выходной файл без какого-либо конкретного преобразования, как в следующем коде (я в основном озабочен тестированием узких мест ввода-вывода с использованием этого потока данных / лучевая модель, потому что это для меня главное):
Pipeline pipeline = Pipeline.create();
PCollection<String> output = pipeline.apply(TextIO.read().from("BIG_CSV_FILE"));
output.apply(TextIO.write().to("BIG_OUTPUT").withSuffix("csv").withNumShards(1));
pipeline.run();
Проблема, с которой я сталкиваюсь, заключается в том, что работают только файлы меньшего размера, но когда используется большой файл, выходной файл не создается (но также не отображается ошибка / исключение, что затрудняет отладку).
Я знаю, что на странице бегунов проекта apache-beam (https://beam.apache.org/documentation/runners/direct/) это явно указано в пункте соображений памяти:
Локальное выполнение ограничено памятью, доступной в вашей локальной среде. Настоятельно рекомендуется запускать конвейер с наборами данных, достаточно маленькими, чтобы поместиться в локальной памяти. Вы можете создать небольшой набор данных в памяти, используя преобразование Create, или вы можете использовать преобразование Read для работы с небольшими локальными или удаленными файлами.
Это говорит о том, что у меня проблема с памятью (но, к сожалению, это явно не указано на консоли, поэтому мне просто интересно здесь). Меня также беспокоит их предложение о том, что набор данных должен уместиться в памяти (почему он не читает из файла по частям, а не помещает весь файл / набор данных в память?)
Второе соображение, которое я хотел бы добавить в этот разговор (если это действительно проблема с памятью): Насколько проста реализация прямого бегуна? Я имею в виду, нетрудно реализовать фрагмент кода, который читает из большого файла по частям, а также выводит в новый файл (также по частям), так что ни в какой момент времени использование памяти не становится проблемой ( потому что ни один файл не загружается полностью в память - только текущий «кусок»). Даже если «непосредственный исполнитель» - это скорее средство для создания прототипов для тестирования семантики, не будет ли слишком много ожидать, что он будет хорошо справляться с огромными файлами? - учитывая, что это унифицированная модель, созданная для работы с потоковой передачей, где размер окна является произвольным, а накопление / агрегация огромных данных до их погружения является стандартным вариантом использования.
Поэтому больше, чем вопрос, я бы глубоко признателен за ваши отзывы / комментарии по любому из этих пунктов: заметили ли вы ограничения ввода-вывода при использовании прямого бегуна? Я упускаю из виду какой-то аспект или действительно так наивно реализован прямой бегун? Вы проверили, что при использовании надлежащего рабочего средства запуска, такого как поток данных flink/spark/google cloud, это ограничение исчезает?
В конечном итоге я буду тестировать с другими бегунами, такими как flink или spark, но меня не впечатляет, что у прямого бегуна (даже если он предназначен только для целей прототипирования) возникают проблемы с этим первым тестом, над которым я работаю - учитывая Вся идея потока данных основана на приеме, обработке и распределении огромных объемов данных в рамках единой модели пакетной / потоковой передачи.
2 ответа
Есть несколько проблем или возможностей. Отвечу в приоритетном порядке.
- Прямой запуск предназначен для тестирования с очень маленькими данными. Он разработан для обеспечения максимальной гарантии качества, при этом производительность не имеет особого значения. Например:
- он случайным образом перемешивает данные, чтобы убедиться, что вы не зависите от порядка, который не будет существовать в производстве
- он сериализует и десериализует данные после каждого шага, чтобы убедиться, что данные будут переданы правильно (производственные исполнители будут избегать сериализации в максимально возможной степени)
- он проверяет, изменили ли вы элементы запрещенными способами, что может привести к потере данных при производстве
Описываемые вами данные не очень большие, и DirectRunner может со временем их обработать при нормальных обстоятельствах.
Вы указали
numShards(1)
что явно исключает весь параллелизм. Это приведет к тому, что все данные будут объединены и обработаны в одном потоке, поэтому он будет медленнее, чем мог бы, даже в DirectRunner. В общем, вам следует избегать искусственного ограничения параллелизма.Если есть какая-либо ошибка нехватки памяти или другая ошибка, препятствующая обработке, вы должны увидеть большое сообщение. В противном случае будет полезно посмотреть на профилирование и использование ЦП, чтобы определить, активна ли обработка.
На этот вопрос косвенно ответил Кенн Ноулз выше. Прямой бегун не предназначен для тестирования производительности, он предназначен только для создания прототипов и корректности данных. У него нет механизма разделения и разделения работы по разделам, и он обрабатывает каждый набор данных в памяти. Тестирование производительности следует проводить с использованием других бегунов (например, Flink Runner) - они обеспечат разделение данных и тип инфраструктуры, необходимый для работы с узкими местами с большим количеством операций ввода-вывода.