Эффективный способ объединения нескольких таблиц в Spark
Подобный вопрос был задан здесь, но он не решает мой вопрос должным образом. У меня есть около 100 фреймов данных, каждый из которых имеет по крайней мере 200,000
строки и мне нужно присоединиться к ним, выполнив full
присоединиться на основе столбца ID
, тем самым создавая DataFrame со столбцами - ID, Col1, Col2,Col3,Col4, Col5..., Col102
,
Просто для иллюстрации, структура моих DataFrames -
df1 = df2 = df3 = ..... df100 =
+----+------+------+------+ +----+------+ +----+------+ +----+------+
| ID| Col1| Col2| Col3| | ID| Col4| | ID| Col5| | ID|Col102|
+----+------+-------------+ +----+------+ +----+------+ +----+------+
| 501| 25.1| 34.9| 436.9| | 501| 22.33| | 503| 22.33| | 501| 78,1|
| 502| 12.2|3225.9| 46.2| | 502| 645.1| | 505| 645.1| | 502| 54.9|
| 504| 754.5| 131.0| 667.3| | 504| 547.2| | 504| 547.2| | 507| 0|
| 505|324.12| 48.93| -1.3| | 506| 2| | 506| 2| | 509| 71.57|
| 506| 27.51| 88.99| 67.7| | 507| 463.7| | 507| 463.7| | 510| 82.1|
.
.
+----+------+------|------| |----|------| |----|------| |----|------|
Я начинаю присоединяться к этим DataFrames, делая full
Присоединяйтесь последовательно на всех из них. Естественно, это вычислительно сложная процедура, и нужно стремиться уменьшить количество shuffles
через разные рабочие узлы. Поэтому я начал с разбиения DataFrame df1
основанный на ID
используя repartition (), который hash-partitions
DataFrame на основе ID
на 30 разделов -
df1 = df1.repartition(30,'ID')
Теперь я делаю full
соединить между df1
а также df2
,
df = df1.join(df2,['ID'],how='full')
df.persist()
поскольку df1
уже hash-partitioned
так что я ожидал, что это join
выше пропустил бы перемешивание и поддерживал бы partitioner
из df1
, но я замечаю, что shuffle
произошло, и это увеличило количество разделов на df
в 200
, Теперь, если я продолжу присоединяться к последующим DataFrames, вызывая их с помощью функции, как показано ниже, я получаю ошибку java.io.IOException: No space left on device
-
def rev(df,num):
df_temp = spark.read.load(filename+str(num)+'.csv')
df_temp.persist()
df = df.join(df_temp,['ID'],how='full')
df_temp.unpersist()
return df
df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() -
print("Total number of rows: "+str(df.count()))
df.unpersist() # Never reached this stage.
Обновление: сообщение об ошибке -
Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
Вопросы: 1. Почему был разделитель df1
не поддерживается, когда мы сделали первый join
?
2.Как я могу эффективно объединить эти несколько таблиц, а также избежать этого OOM
вопрос? Пользователь @silvio здесь предлагает использовать .bucketBy (), но он также ссылается на тот факт, что разделитель будет поддерживаться, чего не произошло. Так что я не уверен, что будет эффективным способом объединения этих нескольких фреймов данных.
Любые предложения / советы будут очень благодарны.
2 ответа
1-й попытайтесь сохранить ваш большой df каждые N итераций с помощью цикла for (что вы, вероятно, уже сделали)
2-й попробуйте контролировать номер раздела по умолчанию, установив sqlContext.sql("set spark.sql.shuffle.partitions=100")
вместо 200 это по умолчанию.
Ваш код должен выглядеть так:
num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
big_df = big_df.join(df, ....)
if i % num_partitions == 0:
big_df = big_df.persist()
Здесь я призываю сохраняться каждые 10 итераций, вы, конечно, можете настроить это число в соответствии с поведением вашей работы.
РЕДАКТИРОВАТЬ: В вашем случае вы сохраняете локальный df_temp внутри функции rev, но не весь фрейм данных, который содержит все предыдущие объединения (df в вашем случае). Это не повлияет на окончательный план выполнения, так как он будет локальным. Что касается моего предложения, давайте предположим, что вам нужно в общей сложности 100 объединений, тогда с кодом выше вы должны выполнить итерацию цикла [1..100] и сохранять накопленные результаты каждые 10 итераций. После сохранения большого фрейма данных DAG будет содержать меньше вычислений в памяти, поскольку промежуточные шаги будут сохранены, и Spark знает, как восстановить их из хранилища, а не пересчитывать все с нуля.
В прошлом у меня была похожая проблема, за исключением того, что у меня не было столько RDD. Самым эффективным решением, которое я смог найти, было использование низкоуровневого API RDD. Сначала сохраните все RDD так, чтобы они были (хэш) разделены и отсортированы в разделах по столбцам столбцов соединения: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html
После этого объединение может быть реализовано с использованием zip-разделов без перестановки или большого объема памяти: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions -org.apache.spark.rdd.RDD-логическое_выражение scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-