从 pyspark DF 创建字典的最快方法
Fastest way to create Dictionary from pyspark DF
我将 Snappydata 与 pyspark 一起用于 运行 我的 sql 查询,并将输出 DF 转换为字典以将其批量插入 mongo。
我经历了许多类似的问题来测试 spark DF 到 Dictionary 的转换。
目前我正在使用 map(lambda row: row.asDict(), x.collect())
这种方法将我的 bulk DF 转换为字典。 10K 条记录需要 2-3 秒。
我在下面说明了我是如何实现我的想法的:
x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)
有没有更快的方法?
我会研究您是否可以直接从 Spark 写入 Mongo,因为那将是最好的方法。
否则,您可以使用此方法:
x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())
for d in dictionary_rdd.toLocalIterator():
db.collection.insert_many(d)
这将以分布式方式在 Spark 中创建所有字典。这些行将返回给驱动程序并一次插入 Mongo 一行,这样您就不会 运行 内存不足。
我建议使用 foreachPartition
:
(snappySession
.sql("select * from test")
.foreachPartition(insert_to_mongo))
其中 insert_to_mongo
:
def insert_to_mongo(rows):
client = ...
db = ...
db.collection.insert_many((row.asDict() for row in rows))
我将 Snappydata 与 pyspark 一起用于 运行 我的 sql 查询,并将输出 DF 转换为字典以将其批量插入 mongo。 我经历了许多类似的问题来测试 spark DF 到 Dictionary 的转换。
目前我正在使用 map(lambda row: row.asDict(), x.collect())
这种方法将我的 bulk DF 转换为字典。 10K 条记录需要 2-3 秒。
我在下面说明了我是如何实现我的想法的:
x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)
有没有更快的方法?
我会研究您是否可以直接从 Spark 写入 Mongo,因为那将是最好的方法。
否则,您可以使用此方法:
x = snappySession.sql("select * from test")
dictionary_rdd = x.rdd.map(lambda row: row.asDict())
for d in dictionary_rdd.toLocalIterator():
db.collection.insert_many(d)
这将以分布式方式在 Spark 中创建所有字典。这些行将返回给驱动程序并一次插入 Mongo 一行,这样您就不会 运行 内存不足。
我建议使用 foreachPartition
:
(snappySession
.sql("select * from test")
.foreachPartition(insert_to_mongo))
其中 insert_to_mongo
:
def insert_to_mongo(rows):
client = ...
db = ...
db.collection.insert_many((row.asDict() for row in rows))