Самый быстрый способ создать словарь из pyspark DF

Я использую Snappydata с pyspark для запуска своих sql-запросов и преобразования выходного DF в словарь, чтобы массово вставить его в монго. Я прошел через много похожих запросов, чтобы проверить преобразование искрового DF в словарь.

В настоящее время я использую map(lambda row: row.asDict(), x.collect()) этот метод для преобразования моего большого DF в словарь. И это занимает 2-3 секунды для 10K записей.

Ниже я изложил, как я реализую свою идею:

x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)

Есть ли более быстрый способ?

2 ответа

Решение

Я бы порекомендовал использовать foreachPartition:

(snappySession
    .sql("select * from test")
    .foreachPartition(insert_to_mongo))

где insert_to_mongo:

def insert_to_mongo(rows):
    client  = ...
    db = ...
    db.collection.insert_many((row.asDict() for row in rows))

Я хотел бы выяснить, можете ли вы написать напрямую в Mongo из Spark, так как это будет лучшим методом.

В противном случае вы можете использовать этот метод:

x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())

for d in dictionary_rdd.toLocalIterator():
    db.collection.insert_many(d)

Это создаст все словари в Spark распределенным способом. Строки будут возвращены драйверу и вставлены в Mongo по одной строке за раз, чтобы вам не хватило памяти.

Другие вопросы по тегам