Самый быстрый способ создать словарь из pyspark DF
Я использую Snappydata с pyspark для запуска своих sql-запросов и преобразования выходного DF в словарь, чтобы массово вставить его в монго. Я прошел через много похожих запросов, чтобы проверить преобразование искрового DF в словарь.
В настоящее время я использую map(lambda row: row.asDict(), x.collect())
этот метод для преобразования моего большого DF в словарь. И это занимает 2-3 секунды для 10K записей.
Ниже я изложил, как я реализую свою идею:
x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)
Есть ли более быстрый способ?
2 ответа
Я бы порекомендовал использовать foreachPartition
:
(snappySession
.sql("select * from test")
.foreachPartition(insert_to_mongo))
где insert_to_mongo
:
def insert_to_mongo(rows):
client = ...
db = ...
db.collection.insert_many((row.asDict() for row in rows))
Я хотел бы выяснить, можете ли вы написать напрямую в Mongo из Spark, так как это будет лучшим методом.
В противном случае вы можете использовать этот метод:
x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())
for d in dictionary_rdd.toLocalIterator():
db.collection.insert_many(d)
Это создаст все словари в Spark распределенным способом. Строки будут возвращены драйверу и вставлены в Mongo по одной строке за раз, чтобы вам не хватило памяти.