限制 RDD 大小

Limiting an RDD size

我有一个 RDD 如下:

rdd
   .filter { case (_, record) => predicates.forall(_.accept(record)) }
   .toDS()
   .cache()

它基本上是在应用谓词后过滤 RDD。

我遇到的问题是...我的一些数据集 RDD 很大,谓词可能为空,这意味着我们试图缓存整个数据集。

相反,我想要做的总是 limit 在缓存数据集之前的数据集大小。 我试过如下设置限制:

dataSet
   .filter { case (_, record) => predicates.forall(_.accept(record)) }
   .limit(10000)
   .toDS()
   .cache()

但我收到 OOM 错误。在我看来,分区在应用限制之前已经过载。 因此,我想知道是否有某种方法可以将限制应用于分区。因此,一旦达到限制,有效过滤就会暂停。

进一步扩展不是一种选择,因为这些数据集太大

您可能应该研究一下 sampling the rdd。如果您提供一致的种子,您将获得一致的结果。您可能不想要“withReplace”。这将 运行 比使用限制更快。 Sample 确实适用于整个数据,但会在减少数据集时进行过滤。

RDD.sample(withReplacement, fraction, seed=None)

Parameters: withReplacement - bool can elements be sampled multiple times (replaced when sampled out)

fraction - float expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

seed - int, optional seed for the random number generation

相关代码链接(rdd.sample), (subclass that does actual work work.)