Разбиение большого асимметричного набора данных в S3 методом Spark's partitionBy

Я пытаюсь записать большой разделенный набор данных на диск с помощью Spark и partitionBy Алгоритм борется с обоими подходами, которые я пробовал.

Перегородки сильно перекошены - некоторые перегородки массивные, а другие крошечные.

Проблема № 1:

Когда я использую передел перед repartitionBySpark записывает все разделы в один файл, даже огромные

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

Это выполняется вечно, потому что Spark не записывает большие разделы параллельно. Если один из разделов имеет 1 ТБ данных, Spark попытается записать все 1 ТБ данных в один файл.

Проблема № 2:

Когда я не пользуюсь repartitionSpark пишет слишком много файлов.

Этот код запишет безумное количество файлов.

df.write.partitionBy("some_col").parquet("partitioned_lake")

Я запустил это на крошечном подмножестве данных 8 ГБ, и Spark записал более 85 000 файлов!

Когда я попытался запустить это для производственного набора данных, один раздел с 1,3 ГБ данных был записан в виде 3100 файлов.

Что бы я хотел

Я бы хотел, чтобы каждый раздел записывался в виде файлов размером 1 ГБ. Таким образом, раздел с 7 ГБ данных будет записан в виде 7 файлов, а раздел с 0,3 ГБ данных будет записан в виде одного файла.

Какой мой лучший путь вперед?

4 ответа

Решение

Самое простое решение - добавить один или несколько столбцов в repartition и явно установить количество разделов.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

где:

  • numPartitions - должно быть верхней границей (фактическое число может быть меньше) желаемого количества файлов, записанных в каталог раздела.
  • $"some_other_col" (и необязательные дополнительные столбцы) должны иметь большую мощность и быть независимыми от $"some_column (между этими двумя должна быть функциональная зависимость, и она не должна быть сильно коррелированной).

    Если данные не содержат такой столбец, вы можете использовать o.a.s.sql.functions.rand,

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")
    

Я бы хотел, чтобы каждый раздел записывался как файлы размером 1 ГБ. Таким образом, раздел с 7 ГБ данных будет записан как 7 файлов, а раздел с 0,3 ГБ данных будет записан как один файл.

Принятый в настоящее время ответ, вероятно, в большинстве случаев достаточно хорош, но не отвечает на запрос о том, чтобы раздел размером 0,3 ГБ был записан в один файл. Вместо этого он напишет numPartitions файлы для каждого каталога выходного раздела, включая раздел размером 0,3 ГБ.

То, что вы ищете, - это способ динамического масштабирования количества выходных файлов по размеру раздела данных. Для этого мы будем опираться на подход 10465355 к использованию для управления поведением repartition(), и масштабируйте диапазон rand() в зависимости от количества файлов, которые мы хотим для этого раздела.

Сложно контролировать поведение секционирования по размеру выходного файла, поэтому вместо этого мы будем управлять им, используя приблизительное количество строк, которое мы хотим для каждого выходного файла.

Я продемонстрирую это на Python, но подход в Scala практически такой же.

      from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.getOrCreate()
skewed_data = (
    spark.createDataFrame(
        [(1,)] * 100 + [(2,)] * 10 + [(3,), (4,), (5,)],
        schema=['id'],
    )
)
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)

Такой подход позволит сбалансировать размер выходных файлов, независимо от того, насколько искажены размеры разделов. Каждый раздел данных получит необходимое количество файлов, чтобы каждый выходной файл имел примерно требуемое количество строк.

Обязательным условием этого подхода является расчет размера каждого раздела, который вы можете увидеть в partition_count. Это неизбежно, если вы действительно хотите динамически масштабировать количество выходных файлов на раздел.

Чтобы продемонстрировать, что это правильно, давайте проверим содержимое раздела:

      from pyspark.sql.functions import spark_partition_id

(
    skewed_data
    .groupBy('id')
    .count()
    .orderBy('id')
    .show()
)

(
    partition_balanced_data
    .select(
        *partition_by_columns,
        spark_partition_id().alias('partition_id'),
    )
    .groupBy(*partition_by_columns, 'partition_id')
    .count()
    .orderBy(*partition_by_columns, 'partition_id')
    .show(30)
)

Вот как выглядит результат:

      +---+-----+
| id|count|
+---+-----+
|  1|  100|
|  2|   10|
|  3|    1|
|  4|    1|
|  5|    1|
+---+-----+

+---+------------+-----+
| id|partition_id|count|
+---+------------+-----+
|  1|           7|    9|
|  1|          49|    6|
|  1|          53|   14|
|  1|         117|   12|
|  1|         126|   10|
|  1|         136|   11|
|  1|         147|   15|
|  1|         161|    7|
|  1|         177|    7|
|  1|         181|    9|
|  2|          85|   10|
|  3|          76|    1|
|  4|         197|    1|
|  5|          10|    1|
+---+------------+-----+

По желанию, каждый выходной файл имеет примерно 10 строк. id=1 получает 10 разделов, id=2 получает 1 раздел и id={3,4,5} каждый получает 1 раздел.

Это решение уравновешивает размеры выходных файлов независимо от перекоса данных и без ограничения параллелизма, полагаясь на maxRecordsPerFile.

Альтернативой методу Ника Чаммаса является создание столбца row_number(), разделенного ключом первичного раздела, а затем деление его на точное количество записей, которые вы хотите разместить в каждом разделе. Выражаясь в SPARK SQL, это выглядит следующим образом:

      SELECT /*+ REPARTITION(id, file_num) */
  id,
  FLOOR(ROW_NUMBER() OVER(PARTITION BY id ORDER BY NULL) / rows_per_file) AS file_num
FROM skewed_data

Дополнительным преимуществом этого является то, что он позволяет размещать большую часть данных в одном разделе по файлам с помощью предложение о вторичном ключе. Не гарантируется, что вторичные ключи будут размещены в одном месте, если номера строк, связанные с вторичным ключом, охватывают два значения. Также возможно, и на самом деле довольно вероятно, что в конечном итоге останется один файл с несколькими записями в каждом разделе.

Ник Чаммас предложил умный подход, который должен сработать во многих случаях. Однако у него есть тот недостаток, что он не может масштабироваться при наличии большого количества разделов. Например, если у вас 100 разделов данных, то часть каждого раздела данных окажется в разделе Spark в памяти (т. е. в том разделе, где). Если у вас много разделов данных, это может превысить объем памяти рабочего Spark.

Это решение немного сложнее, но оно гарантирует, что разные разделы данных не будут перетасованы в одни и те же разделы Spark в памяти.

      partition_col = "partition"
num_rows_in_output = 100_000

# Compute the number of records in each partition.
# Then, convert that into a number of desired output files:
partition_counts = (
    df
    .groupby(partition_col).count()
    .withColumn("num_files", F.ceil(col("count") / num_rows_in_output))
    # Cumulative sum to use as an offset:
    .withColumn(
        "file_offset",
        F.sum("num_files").over(Window.rowsBetween(Window.unboundedPreceding, -1)),
    )
    .na.fill(0, "file_offset")
    .cache()
)

# Use the custom partitioning to write the files:
(
    df
    .join(partition_counts, on=partition_col)
    .withColumn(
        'repartition_seed', 
        F.floor(F.rand() * F.col("num_files")) + F.col("file_offset")
    )
    .repartition("repartition_seed")
    .write
    .partitionBy(partition_counts)
    .parquet("/path/to/output")
)

Однако проблема все еще существует. будет хешировать и распределять на основе хэш-значения. Что нам действительно нужно, так это использовать значение напрямую. Проблема в том, что два разныхможет иметь одно и то же значение и оказаться в одном и том же разделе Spark в памяти. В лучшем случае данные поступают из двух разных разделов данных и по-прежнему записываются в виде двух файлов нужного размера. Однако в худшем случае они берутся из одного и того же раздела данных и в конечном итоге создают файл, который в 2 раза больше, чем вы хотите. (Возможно, что произойдет еще 3 таких хэш-конфликта, в результате чего вы получите файлы в 3 раза больше и так далее.)

Оказывается, решение состоит в том, чтобы временно использовать RDD, который позволяет вам указать точное разбиение. Я протестировал следующее на наборе данных из 500 миллионов записей со смещенными разделами, с которыми я работаю, и этот подход работает хорошо.

Вот вся функция:

      import pyspark.sql.functions as F
from pyspark.sql.window import Window


def repartition_within_partition(
    df: "pyspark.sql.dataframe.DataFrame",
    partition_col,
    records_per_partition: int,
) -> "pyspark.sql.dataframe.DataFrame":
    """Repartition data such that files are the same size, even across partitions.

    :param df: The DataFrame to repartition, partition, and then write.
    :param partition_col: The column(s) on which to partition.
    :param records_per_partition: The approximate number of records to store in each file.
    :return: A DataFrame that's ready to be written.

    Examples:
        >>> (
        ...     repartition_within_partition(df, "partition", 100_000)
        ...     .write.partitionBy("partition").parquet("/path/to/directory")
        ... )
    """
    # The record count per partition, plus the fields we need to compute the partitioning:
    partition_counts = (
        df.groupby(partition_col)
        .count()
        # The number of files to write for this partition:
        .withColumn("num_files", F.ceil(F.col("count") / records_per_partition))
        # The file offset is the cumulative sum of the number of files:
        .withColumn(
            "file_offset",
            F.sum("num_files").over(Window.rowsBetween(Window.unboundedPreceding, -1)),
        )
        .na.fill(0, "file_offset")
        .cache()
    )

    num_partitions = partition_counts.agg(F.sum("num_files")).collect()[0][0]

    return (
        df.join(partition_counts, on=partition_col)
        .withColumn(
            "partition_index", F.floor(F.rand() * F.col("num_files")) + F.col("file_offset")
        )
        # The DataFrame API doesn't let you explicitly set the partition key; only RDDs do.
        # So we convert to an RDD, repartition according to the partition index, then convert back.
        .rdd.map(lambda r: (int(r["partition_index"]), r))
        .partitionBy(num_partitions)
        .map(lambda r: r[1])
        .toDF()
        .drop("count", "num_files", "file_offset", "partition_index")
    )


Другие вопросы по тегам