Apache Spark: эффективное использование mapPartitions в Java
В учебнике, выпущенном ранее, под названием High Performance Spark, разработчики Spark отмечают, что:
Чтобы обеспечить Spark гибкость при выводе некоторых записей на диск, важно представить свои функции внутри
mapPartitions
таким образом, что ваши функции не заставляют загружать весь раздел в памяти (например, неявное преобразование в список). У итераторов есть много методов, с помощью которых мы можем написать преобразования функционального стиля, или вы можете создать свой собственный итератор. Когда преобразование непосредственно принимает и возвращает итератор, не проталкивая его через другую коллекцию, мы вызываем эти преобразования итератор-итератор.
Однако в учебнике отсутствуют хорошие примеры использования mapPartitions
или аналогичные варианты метода. И в сети есть несколько хороших примеров кода, большинство из которых - Scala. Например, мы видим этот код Scala, используя mapPartitions
Автор: zero323, Как добавить столбцы в org.apache.spark.sql.Row внутри mapPartitions.
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
К сожалению, Java не предоставляет ничего хорошего iter.map(...)
для итераторов. Поэтому возникает вопрос: как эффективно использовать преобразования итератор-итератор с mapPartitions
не проливая полностью RDD
на диск в виде списка?
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
ArrayList<OutObj> out = new ArrayList<>();
while(iter.hasNext()) {
InObj current = iter.next();
out.add(someChange(current));
}
return out.iterator();
});
Это, кажется, общий синтаксис для использования mapPartitions
в примерах Java, но я не вижу, как это будет наиболее эффективным, если у вас есть JavaRDD
с десятками тысяч записей (или даже больше... так как, Spark для больших данных). В конечном итоге вы получите список всех объектов в итераторе, просто чтобы превратить его обратно в итератор (что позволяет сказать, что какая-то функция карты была бы гораздо более эффективной здесь).
Примечание: в то время как эти 8 строк кода используют mapPartitions
может быть записан в виде 1 строки с map
или же flatMap
Я намеренно использую mapPartitions
использовать тот факт, что он работает над каждым разделом, а не над каждым элементом в RDD
,
Есть идеи, пожалуйста?
1 ответ
Одним из способов предотвращения форсирования "материализации" всего раздела является преобразование Iterator
в поток, а затем с помощью Stream
функциональный API (например, map
функция).
Как преобразовать итератор в поток? предлагает несколько хороших способов конвертировать Iterator
в Stream
Таким образом, взяв один из предложенных вариантов, мы можем получить:
rdd.mapPartitions((Iterator<InObj> iter) -> {
Iterable<InObj> iterable = () -> iter;
return StreamSupport.stream(iterable.spliterator(), false)
.map(s -> transformRow(s)) // or whatever transformation
.iterator();
});
Что должно быть преобразованием "итератор-итератор", потому что все промежуточные API используются (Iterable
, Stream
) лениво оцениваются.
РЕДАКТИРОВАТЬ: Я не проверял это сам, но OP прокомментировал, и я цитирую, что "нет никакого повышения эффективности при использовании потока через список". Я не знаю, почему это так, и я не знаю, так ли это в целом, но стоит упомянуть.