Spark:测试 RDD 是否为空的有效方法
Spark: Efficient way to test if an RDD is empty
RDD 上没有 isEmpty
方法,那么测试 RDD 是否为空的最有效方法是什么?
RDD.isEmpty()
将成为 Spark 1.3.0 的一部分。
根据this apache mail-thread中的建议和后来对该答案的一些评论,我做了一些本地小实验。最好的方法是使用 take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
在O(1)
中应该运行除非RDD为空,在这种情况下它与分区数成线性关系。
感谢 Josh Rosen 和 Nick Chammas 指出这一点。
注意:如果 RDD 的类型为 RDD[Nothing]
,则此操作失败,例如isEmpty(sc.parallelize(Seq()))
,但这在现实生活中可能不是问题。 isEmpty(sc.parallelize(Seq[Any]()))
工作正常。
编辑:
- 编辑 1: 添加了
take(1)==0
方法,感谢评论。
我原来的建议:使用mapPartitions
。
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
它应该在分区数量上进行扩展,并且不如 take(1)
干净。然而,它对 RDD[Nothing]
.
类型的 RDD 是健壮的
实验:
我用这段代码来计时。
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
在我有 3 个工作内核的本地机器上我得到了这些结果
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true
截至 Spark 1.3 the isEmpty()
is part of the RDD api. A fix that was causing isEmpty
to fail was later fixed in Spark 1.4。
对于 DataFrame 你可以这样做:
val df: DataFrame = ...
df.rdd.isEmpty()
这里是从 RDD 实现(从 1.4.1 开始)的代码粘贴。
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
RDD 上没有 isEmpty
方法,那么测试 RDD 是否为空的最有效方法是什么?
RDD.isEmpty()
将成为 Spark 1.3.0 的一部分。
根据this apache mail-thread中的建议和后来对该答案的一些评论,我做了一些本地小实验。最好的方法是使用 take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
在O(1)
中应该运行除非RDD为空,在这种情况下它与分区数成线性关系。
感谢 Josh Rosen 和 Nick Chammas 指出这一点。
注意:如果 RDD 的类型为 RDD[Nothing]
,则此操作失败,例如isEmpty(sc.parallelize(Seq()))
,但这在现实生活中可能不是问题。 isEmpty(sc.parallelize(Seq[Any]()))
工作正常。
编辑:
- 编辑 1: 添加了
take(1)==0
方法,感谢评论。
我原来的建议:使用mapPartitions
。
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
它应该在分区数量上进行扩展,并且不如 take(1)
干净。然而,它对 RDD[Nothing]
.
实验:
我用这段代码来计时。
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
在我有 3 个工作内核的本地机器上我得到了这些结果
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true
截至 Spark 1.3 the isEmpty()
is part of the RDD api. A fix that was causing isEmpty
to fail was later fixed in Spark 1.4。
对于 DataFrame 你可以这样做:
val df: DataFrame = ...
df.rdd.isEmpty()
这里是从 RDD 实现(从 1.4.1 开始)的代码粘贴。
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}