Apache Spark: эффективное использование mapPartitions в Java

В учебнике, выпущенном ранее, под названием High Performance Spark, разработчики Spark отмечают, что:

Чтобы обеспечить Spark гибкость при выводе некоторых записей на диск, важно представить свои функции внутри mapPartitions таким образом, что ваши функции не заставляют загружать весь раздел в памяти (например, неявное преобразование в список). У итераторов есть много методов, с помощью которых мы можем написать преобразования функционального стиля, или вы можете создать свой собственный итератор. Когда преобразование непосредственно принимает и возвращает итератор, не проталкивая его через другую коллекцию, мы вызываем эти преобразования итератор-итератор.

Однако в учебнике отсутствуют хорошие примеры использования mapPartitions или аналогичные варианты метода. И в сети есть несколько хороших примеров кода, большинство из которых - Scala. Например, мы видим этот код Scala, используя mapPartitions Автор: zero323, Как добавить столбцы в org.apache.spark.sql.Row внутри mapPartitions.

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

К сожалению, Java не предоставляет ничего хорошего iter.map(...) для итераторов. Поэтому возникает вопрос: как эффективно использовать преобразования итератор-итератор с mapPartitions не проливая полностью RDD на диск в виде списка?

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

Это, кажется, общий синтаксис для использования mapPartitions в примерах Java, но я не вижу, как это будет наиболее эффективным, если у вас есть JavaRDD с десятками тысяч записей (или даже больше... так как, Spark для больших данных). В конечном итоге вы получите список всех объектов в итераторе, просто чтобы превратить его обратно в итератор (что позволяет сказать, что какая-то функция карты была бы гораздо более эффективной здесь).

Примечание: в то время как эти 8 строк кода используют mapPartitions может быть записан в виде 1 строки с map или же flatMapЯ намеренно использую mapPartitions использовать тот факт, что он работает над каждым разделом, а не над каждым элементом в RDD,

Есть идеи, пожалуйста?

1 ответ

Решение

Одним из способов предотвращения форсирования "материализации" всего раздела является преобразование Iterator в поток, а затем с помощью Streamфункциональный API (например, map функция).

Как преобразовать итератор в поток? предлагает несколько хороших способов конвертировать Iterator в StreamТаким образом, взяв один из предложенных вариантов, мы можем получить:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

Что должно быть преобразованием "итератор-итератор", потому что все промежуточные API используются (Iterable, Stream) лениво оцениваются.

РЕДАКТИРОВАТЬ: Я не проверял это сам, но OP прокомментировал, и я цитирую, что "нет никакого повышения эффективности при использовании потока через список". Я не знаю, почему это так, и я не знаю, так ли это в целом, но стоит упомянуть.

Другие вопросы по тегам