PySpark partitionBy、repartition 还是什么都不做?

PySpark partitionBy, repartition, or nothing?

所以我所做的是

rdd.flatMap(lambda x: enumerate(x))

为我的数据制作键 0-49。然后我决定这样做:

rdd.flatMap(lambda x: enumerate(x)).partitionBy(50)

我注意到发生了一些奇怪的事情,所以对于以下文件大小,10GB 的文件需要 46 秒来完成我的计算,而 50GB 的文件需要 10 分 31 秒。我检查了文件,由于某种原因它只有 4 个块。

所以我所做的改变了:

sc.textFile("file", 100)

我删除了分区,50GB 的文件减少到大约 1 分钟。我想知道加载后尝试重新分区数据是否仍然有意义?也许是钥匙?

如果我正确理解你的问题,你会问你何时需要额外的重新分区。首先,您应该记住重新分区是一个 expensive operation。明智地使用它。二是没有严谨的回答,靠经验来的。但一些常见的情况是:

  1. 您可以尝试在 join, leftOuterJoin, cogroup... 之前的日期调用 repartition 有时它可以加快计算速度。

  2. flatMap你的数据进入更多"heavy-weighted"数据并遇到Java heap space Exception java.lang.OutOfMemoryError。那么你当然应该让你的分区更小以适应 flatMap.

  3. 之后的数据
  4. 您将数据加载到 database\mongoDb\elasticSearch... 您对数据调用 repartition,然后在 forEachPartition 代码块中批量插入所有这些分区到数据库。所以这些块的大小应该是合理的。