Сбой перераспределения фрейма данных pyspark и как избежать начального размера раздела
Я пытаюсь настроить производительность spark, используя разделы на кадре данных spark. Вот код:
file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
.where((func.col("organization") == organization))
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()
Во время исполнения first()
Я проверяю этапы работы в Spark UI и вот что я нахожу:
- Почему нет
repartition
шаг в стадии? - Почему существует также этап 8? Я просил только одно действие
first()
, Это из-за тасования, вызванногоrepartition
? - Есть ли способ изменить перераспределение файлов паркета без необходимости выполнять такие операции? Как изначально, когда я прочитал
df
Вы можете видеть, что он разделен на 43 тыс. разделов, что очень много (по сравнению с его размером, когда я сохраняю его в CSV-файл: 4 МБ с 13 тыс. строк) и создает проблемы на дальнейших шагах, поэтому я хотел переразбить его. - Должен ли я использовать
cache()
после передела?df = df.repartition(10).cache()
? Как когда я казнилdf.first()
во второй раз, я также получаю запланированный этап с 43k разделов, несмотря наdf.rdd.getNumPartitions()
который вернул 10. РЕДАКТИРОВАТЬ: количество разделов просто попробовать. мои вопросы направлены на то, чтобы помочь мне понять, как сделать правильное перераспределение.
Примечание: изначально Dataframe считывается из набора файлов паркета в Hadoop.
Я уже прочитал это как ссылку. Как раздел Spark (ING) работает с файлами в HDFS?
1 ответ
- Всякий раз, когда происходит перетасовка, наступает новый этап. и
перераспределение вызывает тасование, поэтому у вас есть два этапа. - кэширование используется, когда вы используете фрейм данных несколько раз, чтобы не читать его дважды.
Используйте coalesce вместо repartiton. Я думаю, что это вызывает меньше перетасовки, поскольку это только уменьшает количество разделов.