Эффективный способ объединения нескольких таблиц в Spark

Подобный вопрос был задан здесь, но он не решает мой вопрос должным образом. У меня есть около 100 фреймов данных, каждый из которых имеет по крайней мере 200,000 строки и мне нужно присоединиться к ним, выполнив full присоединиться на основе столбца ID, тем самым создавая DataFrame со столбцами - ID, Col1, Col2,Col3,Col4, Col5..., Col102,

Просто для иллюстрации, структура моих DataFrames -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|

Я начинаю присоединяться к этим DataFrames, делая full Присоединяйтесь последовательно на всех из них. Естественно, это вычислительно сложная процедура, и нужно стремиться уменьшить количество shuffles через разные рабочие узлы. Поэтому я начал с разбиения DataFrame df1 основанный на ID используя repartition (), который hash-partitions DataFrame на основе ID на 30 разделов -

df1 = df1.repartition(30,'ID')

Теперь я делаю full соединить между df1 а также df2,

df = df1.join(df2,['ID'],how='full')
df.persist()

поскольку df1 уже hash-partitionedтак что я ожидал, что это join выше пропустил бы перемешивание и поддерживал бы partitioner из df1, но я замечаю, что shuffle произошло, и это увеличило количество разделов на df в 200, Теперь, если я продолжу присоединяться к последующим DataFrames, вызывая их с помощью функции, как показано ниже, я получаю ошибку java.io.IOException: No space left on device -

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.

Обновление: сообщение об ошибке -

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

Вопросы: 1. Почему был разделитель df1 не поддерживается, когда мы сделали первый join?

2.Как я могу эффективно объединить эти несколько таблиц, а также избежать этого OOM вопрос? Пользователь @silvio здесь предлагает использовать .bucketBy (), но он также ссылается на тот факт, что разделитель будет поддерживаться, чего не произошло. Так что я не уверен, что будет эффективным способом объединения этих нескольких фреймов данных.

Любые предложения / советы будут очень благодарны.

2 ответа

1-й попытайтесь сохранить ваш большой df каждые N итераций с помощью цикла for (что вы, вероятно, уже сделали)

2-й попробуйте контролировать номер раздела по умолчанию, установив sqlContext.sql("set spark.sql.shuffle.partitions=100") вместо 200 это по умолчанию.

Ваш код должен выглядеть так:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
   big_df = big_df.join(df, ....)

   if i % num_partitions == 0:
     big_df = big_df.persist()

Здесь я призываю сохраняться каждые 10 итераций, вы, конечно, можете настроить это число в соответствии с поведением вашей работы.

РЕДАКТИРОВАТЬ: В вашем случае вы сохраняете локальный df_temp внутри функции rev, но не весь фрейм данных, который содержит все предыдущие объединения (df в вашем случае). Это не повлияет на окончательный план выполнения, так как он будет локальным. Что касается моего предложения, давайте предположим, что вам нужно в общей сложности 100 объединений, тогда с кодом выше вы должны выполнить итерацию цикла [1..100] и сохранять накопленные результаты каждые 10 итераций. После сохранения большого фрейма данных DAG будет содержать меньше вычислений в памяти, поскольку промежуточные шаги будут сохранены, и Spark знает, как восстановить их из хранилища, а не пересчитывать все с нуля.

В прошлом у меня была похожая проблема, за исключением того, что у меня не было столько RDD. Самым эффективным решением, которое я смог найти, было использование низкоуровневого API RDD. Сначала сохраните все RDD так, чтобы они были (хэш) разделены и отсортированы в разделах по столбцам столбцов соединения: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html

После этого объединение может быть реализовано с использованием zip-разделов без перестановки или большого объема памяти: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions -org.apache.spark.rdd.RDD-логическое_выражение scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-

Другие вопросы по тегам