PySpark partitionBy、repartition 还是什么都不做?
PySpark partitionBy, repartition, or nothing?
所以我所做的是
rdd.flatMap(lambda x: enumerate(x))
为我的数据制作键 0-49。然后我决定这样做:
rdd.flatMap(lambda x: enumerate(x)).partitionBy(50)
我注意到发生了一些奇怪的事情,所以对于以下文件大小,10GB 的文件需要 46 秒来完成我的计算,而 50GB 的文件需要 10 分 31 秒。我检查了文件,由于某种原因它只有 4 个块。
所以我所做的改变了:
sc.textFile("file", 100)
我删除了分区,50GB 的文件减少到大约 1 分钟。我想知道加载后尝试重新分区数据是否仍然有意义?也许是钥匙?
如果我正确理解你的问题,你会问你何时需要额外的重新分区。首先,您应该记住重新分区是一个 expensive operation。明智地使用它。二是没有严谨的回答,靠经验来的。但一些常见的情况是:
您可以尝试在 join, leftOuterJoin, cogroup...
之前的日期调用 repartition
有时它可以加快计算速度。
你flatMap
你的数据进入更多"heavy-weighted"数据并遇到Java heap space Exception java.lang.OutOfMemoryError
。那么你当然应该让你的分区更小以适应 flatMap
.
之后的数据
您将数据加载到 database\mongoDb\elasticSearch... 您对数据调用 repartition
,然后在 forEachPartition
代码块中批量插入所有这些分区到数据库。所以这些块的大小应该是合理的。
所以我所做的是
rdd.flatMap(lambda x: enumerate(x))
为我的数据制作键 0-49。然后我决定这样做:
rdd.flatMap(lambda x: enumerate(x)).partitionBy(50)
我注意到发生了一些奇怪的事情,所以对于以下文件大小,10GB 的文件需要 46 秒来完成我的计算,而 50GB 的文件需要 10 分 31 秒。我检查了文件,由于某种原因它只有 4 个块。
所以我所做的改变了:
sc.textFile("file", 100)
我删除了分区,50GB 的文件减少到大约 1 分钟。我想知道加载后尝试重新分区数据是否仍然有意义?也许是钥匙?
如果我正确理解你的问题,你会问你何时需要额外的重新分区。首先,您应该记住重新分区是一个 expensive operation。明智地使用它。二是没有严谨的回答,靠经验来的。但一些常见的情况是:
您可以尝试在
join, leftOuterJoin, cogroup...
之前的日期调用repartition
有时它可以加快计算速度。你
flatMap
你的数据进入更多"heavy-weighted"数据并遇到Java heap space Exception java.lang.OutOfMemoryError
。那么你当然应该让你的分区更小以适应flatMap
. 之后的数据
您将数据加载到 database\mongoDb\elasticSearch... 您对数据调用
repartition
,然后在forEachPartition
代码块中批量插入所有这些分区到数据库。所以这些块的大小应该是合理的。