Сбой перераспределения фрейма данных pyspark и как избежать начального размера раздела

Я пытаюсь настроить производительность spark, используя разделы на кадре данных spark. Вот код:

file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
    .where((func.col("organization") == organization)) 
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()

Во время исполнения first() Я проверяю этапы работы в Spark UI и вот что я нахожу:

  • Почему нет repartition шаг в стадии?
  • Почему существует также этап 8? Я просил только одно действие first(), Это из-за тасования, вызванного repartition?
  • Есть ли способ изменить перераспределение файлов паркета без необходимости выполнять такие операции? Как изначально, когда я прочитал df Вы можете видеть, что он разделен на 43 тыс. разделов, что очень много (по сравнению с его размером, когда я сохраняю его в CSV-файл: 4 МБ с 13 тыс. строк) и создает проблемы на дальнейших шагах, поэтому я хотел переразбить его.
  • Должен ли я использовать cache() после передела? df = df.repartition(10).cache()? Как когда я казнил df.first() во второй раз, я также получаю запланированный этап с 43k разделов, несмотря на df.rdd.getNumPartitions() который вернул 10. РЕДАКТИРОВАТЬ: количество разделов просто попробовать. мои вопросы направлены на то, чтобы помочь мне понять, как сделать правильное перераспределение.

Примечание: изначально Dataframe считывается из набора файлов паркета в Hadoop.

Я уже прочитал это как ссылку. Как раздел Spark (ING) работает с файлами в HDFS?

1 ответ

  • Всякий раз, когда происходит перетасовка, наступает новый этап. и
    перераспределение вызывает тасование, поэтому у вас есть два этапа.
  • кэширование используется, когда вы используете фрейм данных несколько раз, чтобы не читать его дважды.

Используйте coalesce вместо repartiton. Я думаю, что это вызывает меньше перетасовки, поскольку это только уменьшает количество разделов.

Другие вопросы по тегам