修改过滤条件变量时的 Pyspark RDD 过滤行为
Pyspark RDD filter behaviour when filter condition variable is modified
当下面代码为运行时:
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
输出为:
49
0
这是不正确的,因为在 10 和 49 之间有 39 个值。似乎将 t
从 50 更改为 10 也影响了第一个过滤器,并且它得到了重新评估,因此当连续应用两个过滤器时它实际上变成了 x<10
,这将导致 1、2、3、4、5、6、7、8、9,然后是 x>10
,导致一个空的 rdd。
但是当我在代码中添加调试打印时,结果不是我所期望的,我正在寻找解释:
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.collect())
t = 10
print(B.collect())
print(B.count())
C = B.filter(lambda x: x > t)
print(C.collect())
print(C.count())
输出为:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
9
[]
0
为什么在 t=10
之后计数是 10 但 print(B.collect())
显示预期的 rdd 值从 1 到 49?如果在更改 t
后触发收集重新执行 filter
,那么 collect()
不应该显示 1-9 的值吗?
我是pyspark的新手,我怀疑这与spark的惰性操作和缓存有关。有人可以解释一下幕后发生的事情吗?
谢谢!
您的假设是正确的,观察到的行为与 Spark 对转换的惰性求值有关。
当执行 B.count()
时,Spark 简单地应用过滤器 x < t
和 t = 50
并打印 49
.
的期望值
执行C.count()
时,Spark在C
的执行计划中看到两个过滤器,即x < t
和x > t
。此时t
已经被设置为10
并且rdd中没有元素同时满足小于和大于10
的条件。 Spark 忽略了第一个过滤器已经被评估的事实。当一个 Spark 动作被调用时 所有 当前 rdd 历史中的转换被执行(除非一些中间结果已经被缓存,见下文)。
更详细地(稍微)检查此行为的一种方法是切换到 Scala 并为两个 rdds 打印 toDebugString。1
println(B.toDebugString)
打印
(4) MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
| ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
而
println(C.toDebugString)
打印
(4) MapPartitionsRDD[2] at filter at SparkStarter.scala:28 []
| MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
| ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
在这里我们可以看到,对于 rdd B
应用了一个过滤器,对于 rdd C
应用了两个过滤器。
如何解决这个问题?
如果缓存了第一个过滤器的结果,则打印出预期的结果。当 t
发生变化并应用第二个过滤器时 C.count()
仅根据 B
:
的缓存结果触发第二个过滤器
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t).cache()
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
打印预期结果。
49
39
1 不幸的是,这仅适用于 Scala 版本的 Spark。 PySpark 似乎“压缩”了 toDebugString
(版本 3.1.1)的输出。
当下面代码为运行时:
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
输出为:
49
0
这是不正确的,因为在 10 和 49 之间有 39 个值。似乎将 t
从 50 更改为 10 也影响了第一个过滤器,并且它得到了重新评估,因此当连续应用两个过滤器时它实际上变成了 x<10
,这将导致 1、2、3、4、5、6、7、8、9,然后是 x>10
,导致一个空的 rdd。
但是当我在代码中添加调试打印时,结果不是我所期望的,我正在寻找解释:
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.collect())
t = 10
print(B.collect())
print(B.count())
C = B.filter(lambda x: x > t)
print(C.collect())
print(C.count())
输出为:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
9
[]
0
为什么在 t=10
之后计数是 10 但 print(B.collect())
显示预期的 rdd 值从 1 到 49?如果在更改 t
后触发收集重新执行 filter
,那么 collect()
不应该显示 1-9 的值吗?
我是pyspark的新手,我怀疑这与spark的惰性操作和缓存有关。有人可以解释一下幕后发生的事情吗?
谢谢!
您的假设是正确的,观察到的行为与 Spark 对转换的惰性求值有关。
当执行 B.count()
时,Spark 简单地应用过滤器 x < t
和 t = 50
并打印 49
.
执行C.count()
时,Spark在C
的执行计划中看到两个过滤器,即x < t
和x > t
。此时t
已经被设置为10
并且rdd中没有元素同时满足小于和大于10
的条件。 Spark 忽略了第一个过滤器已经被评估的事实。当一个 Spark 动作被调用时 所有 当前 rdd 历史中的转换被执行(除非一些中间结果已经被缓存,见下文)。
更详细地(稍微)检查此行为的一种方法是切换到 Scala 并为两个 rdds 打印 toDebugString。1
println(B.toDebugString)
打印
(4) MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
| ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
而
println(C.toDebugString)
打印
(4) MapPartitionsRDD[2] at filter at SparkStarter.scala:28 []
| MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
| ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
在这里我们可以看到,对于 rdd B
应用了一个过滤器,对于 rdd C
应用了两个过滤器。
如何解决这个问题?
如果缓存了第一个过滤器的结果,则打印出预期的结果。当 t
发生变化并应用第二个过滤器时 C.count()
仅根据 B
:
A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t).cache()
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())
打印预期结果。
49
39
1 不幸的是,这仅适用于 Scala 版本的 Spark。 PySpark 似乎“压缩”了 toDebugString
(版本 3.1.1)的输出。