Spark паркетная перегородка: большое количество файлов
Я пытаюсь использовать искровое разделение. Я пытался сделать что-то вроде
data.write.partitionBy("key").parquet("/location")
Проблема в том, что каждый раздел создает огромное количество паркетных файлов, что приводит к медленному чтению, если я пытаюсь читать из корневого каталога.
Чтобы избежать этого я пытался
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Это, однако, создает количество паркетных файлов numPart в каждом разделе. Теперь размер моего раздела другой. Так что в идеале я хотел бы иметь отдельную объединение на раздел. Это, однако, не выглядит легким делом. Мне нужно посетить все разделы, объединить к определенному номеру и хранить в отдельном месте.
Как мне использовать разбиение, чтобы избежать много файлов после записи?
5 ответов
Во-первых, я бы действительно избегал использования coalesce
поскольку это часто продвигается дальше в цепочке преобразований и может разрушить параллелизм вашей работы (я спрашивал об этой проблеме здесь: как предотвратить оптимизацию Spark)
Записать 1 файл на паркетный раздел очень просто (см. Метод записи Spark для данных на множество маленьких файлов):
data.repartition($"key").write.partitionBy("key").parquet("/location")
Если вы хотите установить произвольное количество файлов (или файлов одинакового размера), вам необходимо дополнительно перераспределить ваши данные, используя другой атрибут, который можно использовать (я не могу сказать вам, что это может быть в вашем случае):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
another_key
может быть другим атрибутом вашего набора данных или производным атрибутом, использующим некоторые модули или операции округления существующих атрибутов. Вы даже можете использовать оконные функции с row_number
над key
а затем вокруг этого чем-то вроде
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
Это поставило бы вас N
записи в 1 паркетный файл
Давайте расширим ответ Рафаэля Рота дополнительным подходом, который создаст верхнюю границу количества файлов, которые может содержать каждый раздел, как обсуждается в этом ответе:
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
В этом сообщении блога подробно объясняются все параметры разделения, которые можно использовать вместе с partitionBy.
Другие ответы здесь очень хороши, но имеют некоторые проблемы:
Разбивать большие разделы на более мелкие файлы очень удобно, но с двумя оговорками:
Если ваши столбцы секционирования сильно перекошены, перераспределение по ним означает потенциальное перемещение всех данных для самого большого раздела данных в один раздел DataFrame. Если этот раздел DataFrame станет слишком большим, это само по себе может привести к сбою вашей работы.
Чтобы привести простой пример, представьте, что
подойдет для DataFrame, в котором есть 1 строка для каждого человека в мире. гарантирует, что ваши выходные файлы не превышают определенного количества строк, но только одна задача сможет последовательно записывать эти файлы. Одна задача должна будет работать со всем разделом данных вместо того, чтобы записывать этот большой раздел данных несколькими задачами.
— элегантное решение, но оно плохо справляется с небольшими разделами данных. Он будет выписывать файлы для каждого раздела данных, даже если они крошечные. Во многих ситуациях это может не быть проблемой, но если у вас есть большое озеро данных, вы знаете, что запись большого количества маленьких файлов со временем убьет производительность вашего озера данных.
Таким образом, одно решение плохо работает с очень большими разделами данных, а другое — с очень маленькими разделами данных.
Нам нужен способ динамического масштабирования количества выходных файлов по размеру раздела данных. Если он очень большой, нам нужно много файлов. Если он очень маленький, нам нужно всего несколько файлов или даже один файл.
Решение состоит в том, чтобы расширить подход, используя
Вот суть по очень похожему вопросу:
# In this example, `id` is a column in `skewed_data`.
partition_by_columns = ['id']
desired_rows_per_output_file = 10
partition_count = skewed_data.groupBy(partition_by_columns).count()
partition_balanced_data = (
skewed_data
.join(partition_count, on=partition_by_columns)
.withColumn(
'repartition_seed',
(
rand() * partition_count['count'] / desired_rows_per_output_file
).cast('int')
)
.repartition(*partition_by_columns, 'repartition_seed')
)
Это сбалансирует размер выходных файлов независимо от перекоса разделов, не ограничивая ваш параллелизм и не создавая слишком много маленьких файлов для небольших разделов.
Если вы хотите запустить этот код самостоятельно, я предоставил решения, которое я разместилавтономный пример вместе с доказательством того, что разделы DataFrame сбалансированы правильно.
Это работает для меня очень хорошо:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
Он создает N файлов в каждом выходном разделе (каталоге) и (анекдотически) быстрее, чем при использовании coalesce
и (опять же, по моим собственным данным) быстрее, чем только перераспределение на выходе.
Если вы работаете с S3, я также рекомендую делать все на локальных дисках (Spark много делает для создания / переименования / удаления файлов во время записи), и как только все решено, используйте hadoop FileUtil
(или просто aws cli), чтобы скопировать все:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Редактировать: Согласно обсуждению в комментариях:
Вы - набор данных со столбцом раздела YEAR, но в каждом данном ГОДЕ содержится много разных данных. Таким образом, один год может иметь 1 ГБ данных, а другой может иметь 100 ГБ.
Вот psuedocode для одного способа справиться с этим:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
Но я на самом деле не знаю, что это будет работать. Вполне возможно, что Spark будет иметь проблемы с чтением в переменном количестве файлов на раздел столбца.
Другой способ сделать это - написать свой собственный разделитель, но я понятия не имею, что с этим связано, поэтому я не могу предоставить какой-либо код.
Эти ответы великолепны в теории, но, насколько я могу судить, PySpark серьезно сломан. Единственное, что, кажется, работает, это использовать как номер, так и ложный разделитель.F.floor(F.rand() * num_files_per_partition)
. Кроме того, установка количества разделов ничего не дает. Это касается Databricks, работающих на двух исполнителях r4.2xlarge в Databricks Runtime 12.2 ML с Apache Spark 3.3.2.
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit
mk_temp_path = lambda s: os.path.join('s3://<my path>', s)
N = 10000
num_part = 10
num_file = 4
def ls_recursive(path, depth=0):
fs = dbutils.fs.ls(path)
maxdepth=2
if depth > maxdepth:
print(f"reached max depth > {maxdepth} at {path}")
return []
return [
y
for x in fs
for y in (ls_recursive(x.path, depth + 1) if (x.size == 0 and x.path != path) else [x.path])
]
df = spark.createDataFrame(pd.DataFrame({"id":np.arange(N), "x":np.random.rand(N)})).withColumn('hp', F.abs(F.hash('id')%num_part))
fn='basic_write'
df.write.mode('overwrite').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in dbutils.fs.ls(mk_temp_path(fn)) if x.path.endswith('parquet')]))
fn='partition_write'
df.write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_col'
df.repartition('hp').write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_num'
df.repartition(num_part*num_file).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_colsplit'
df.repartition(col('hp'), F.floor(F.rand()*lit(num_file))).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_numcol'
df.repartition(num_part*num_file, col('hp')).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_numcolsplit'
df.repartition(num_part*num_file, col('hp'), F.floor(F.rand()*lit(num_file))).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repartition_numcolrand'
df.repartition(num_part*num_file, col('hp'), F.rand()).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
fn='repart_file_only_BAD'
df.repartition(num_file).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
spark.conf.set("spark.sql.shuffle.partitions", 32)
df = spark.createDataFrame(pd.DataFrame({"id":np.arange(N), "x":np.random.rand(N)})).withColumn('hp', F.abs(F.hash('id')%num_part))
fn='basic_write_32'
df.write.mode('overwrite').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in dbutils.fs.ls(mk_temp_path(fn)) if x.path.endswith('parquet')]))
fn='partition_write_32'
df.write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
basic_write: 16
partition_write: 236
repartition_col: 10
repartition_num: 400
repartition_colsplit: 10
repartition_numcol: 10
repartition_numcolsplit: 39
repartition_numcolrand: 400
repart_file_only_BAD: 40
basic_write_32: 16
partition_write_32: 160