修改过滤条件变量时的 Pyspark RDD 过滤行为

Pyspark RDD filter behaviour when filter condition variable is modified

当下面代码为运行时:

A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())

输出为:

49
0

这是不正确的,因为在 10 和 49 之间有 39 个值。似乎将 t 从 50 更改为 10 也影响了第一个过滤器,并且它得到了重新评估,因此当连续应用两个过滤器时它实际上变成了 x<10,这将导致 1、2、3、4、5、6、7、8、9,然后是 x>10,导致一个空的 rdd。

但是当我在代码中添加调试打印时,结果不是我所期望的,我正在寻找解释:

A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.collect())
t = 10
print(B.collect())
print(B.count())
C = B.filter(lambda x: x > t)
print(C.collect())
print(C.count())

输出为:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
9
[]
0

为什么在 t=10 之后计数是 10 但 print(B.collect()) 显示预期的 rdd 值从 1 到 49?如果在更改 t 后触发收集重新执行 filter,那么 collect() 不应该显示 1-9 的值吗?

我是pyspark的新手,我怀疑这与spark的惰性操作和缓存有关。有人可以解释一下幕后发生的事情吗?

谢谢!

您的假设是正确的,观察到的行为与 Spark 对转换的惰性求值有关。

当执行 B.count() 时,Spark 简单地应用过滤器 x < tt = 50 并打印 49.

的期望值

执行C.count()时,Spark在C的执行计划中看到两个过滤器,即x < tx > t。此时t已经被设置为10并且rdd中没有元素同时满足小于和大于10的条件。 Spark 忽略了第一个过滤器已经被评估的事实。当一个 Spark 动作被调用时 所有 当前 rdd 历史中的转换被执行(除非一些中间结果已经被缓存,见下文)。

更详细地(稍微)检查此行为的一种方法是切换到 Scala 并为两个 rdds 打印 toDebugString1

println(B.toDebugString)

打印

(4) MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
 |  ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []

println(C.toDebugString)

打印

(4) MapPartitionsRDD[2] at filter at SparkStarter.scala:28 []
 |  MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
 |  ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []

在这里我们可以看到,对于 rdd B 应用了一个过滤器,对于 rdd C 应用了两个过滤器。

如何解决这个问题?

如果缓存了第一个过滤器的结果,则打印出预期的结果。当 t 发生变化并应用第二个过滤器时 C.count() 仅根据 B:

的缓存结果触发第二个过滤器
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t).cache()
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())

打印预期结果。

49
39

1 不幸的是,这仅适用于 Scala 版本的 Spark。 PySpark 似乎“压缩”了 toDebugString(版本 3.1.1)的输出。