Spark RDD 上的惰性 foreach
Lazy foreach on a Spark RDD
我有一个很大的字符串 RDD(通过几个 sc.textFile(...))
的联合获得)。
我现在想在该 RDD 中搜索给定的字符串,并且我希望搜索在找到 "good enough" 匹配项时停止。
我可以为此改造 foreach
、filter
或 map
,但所有这些都将遍历该 RDD 中的每个元素,无论匹配项是否具有已达到。
有没有办法缩短这个过程并避免遍历整个 RDD?
不是真的。没有 find
方法,就像在启发 Spark API 的 Scala 集合中一样,一旦找到满足谓词的元素,它就会停止查找。最好的选择可能是使用一个数据源,它可以最大限度地减少过度扫描,例如 Cassandra,驱动程序会在其中下推一些查询参数。您还可以查看名为 BlinkDB 的更具实验性的 Berkeley 项目。
最重要的是,Spark 的设计更多是为了扫描数据集,就像之前的 MapReduce,而不是传统的类似数据库的查询。
I could retrofit foreach, or filter, or map for this purpose, but all of these will iterate through every element in that RDD
其实,你错了。如果您限制结果(使用 take
或 first
),Spark 引擎足够智能以优化计算:
import numpy as np
from __future__ import print_function
np.random.seed(323)
acc = sc.accumulator(0)
def good_enough(x, threshold=7000):
global acc
acc += 1
return x > threshold
rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000))
x = rdd.filter(good_enough).first()
现在让我们检查累积:
>>> print("Checked {0} items, found {1}".format(acc.value, x))
Checked 6 items, found 7109
只是为了确定一切是否按预期工作:
acc = sc.accumulator(0)
rdd.filter(lambda x: good_enough(x, 100000)).take(1)
assert acc.value == rdd.count()
同样的事情也可以完成,使用数据帧和 udf 可能会更有效。
注意:在某些情况下,甚至可以在 Spark 中使用无限序列并仍然得到结果。您可以查看我对 的回答作为示例。
我有一个很大的字符串 RDD(通过几个 sc.textFile(...))
的联合获得)。
我现在想在该 RDD 中搜索给定的字符串,并且我希望搜索在找到 "good enough" 匹配项时停止。
我可以为此改造 foreach
、filter
或 map
,但所有这些都将遍历该 RDD 中的每个元素,无论匹配项是否具有已达到。
有没有办法缩短这个过程并避免遍历整个 RDD?
不是真的。没有 find
方法,就像在启发 Spark API 的 Scala 集合中一样,一旦找到满足谓词的元素,它就会停止查找。最好的选择可能是使用一个数据源,它可以最大限度地减少过度扫描,例如 Cassandra,驱动程序会在其中下推一些查询参数。您还可以查看名为 BlinkDB 的更具实验性的 Berkeley 项目。
最重要的是,Spark 的设计更多是为了扫描数据集,就像之前的 MapReduce,而不是传统的类似数据库的查询。
I could retrofit foreach, or filter, or map for this purpose, but all of these will iterate through every element in that RDD
其实,你错了。如果您限制结果(使用 take
或 first
),Spark 引擎足够智能以优化计算:
import numpy as np
from __future__ import print_function
np.random.seed(323)
acc = sc.accumulator(0)
def good_enough(x, threshold=7000):
global acc
acc += 1
return x > threshold
rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000))
x = rdd.filter(good_enough).first()
现在让我们检查累积:
>>> print("Checked {0} items, found {1}".format(acc.value, x))
Checked 6 items, found 7109
只是为了确定一切是否按预期工作:
acc = sc.accumulator(0)
rdd.filter(lambda x: good_enough(x, 100000)).take(1)
assert acc.value == rdd.count()
同样的事情也可以完成,使用数据帧和 udf 可能会更有效。
注意:在某些情况下,甚至可以在 Spark 中使用无限序列并仍然得到结果。您可以查看我对