如何通过 Spark 控制 RDD 的隐式缓存?
How to control implicit caching of RDDs by Spark?
作为 Spark 的新手,我一直在关注他们的python example for estimation of PI。
我有兴趣通过在同一上下文中多次重新估计 PI 来了解 Spark 的性能。
我观察到的是,在这些重新估计中 PI 的值没有变化,并且性能计时似乎表明中间 RDD 正在被隐式缓存,然后在后续计算中重用。
有没有什么方法可以配置 Spark 来控制这种行为,并且中间 RDD 总是重新生成?使用 unpersist() 似乎没有效果。
我的代码是 github 上的 here,通过调用
执行
`spark-submit pi2.py`
得到如下结果:
No caching-0: 8000 generated 6256 in 1.14984297752 secs (PI = 3.128)
No caching-1: 8000 generated 6256 in 0.0597329139709 secs (PI = 3.128)
No caching-2: 8000 generated 6256 in 0.0577840805054 secs (PI = 3.128)
No caching-3: 8000 generated 6256 in 0.0545349121094 secs (PI = 3.128)
No caching-4: 8000 generated 6256 in 0.0544559955597 secs (PI = 3.128)
With caching-0: 8000 generated 6256 in 0.069139957428 secs (PI = 3.128)
With caching-1: 8000 generated 6256 in 0.0549170970917 secs (PI = 3.128)
With caching-2: 8000 generated 6256 in 0.0531771183014 secs (PI = 3.128)
With caching-3: 8000 generated 6256 in 0.0502359867096 secs (PI = 3.128)
With caching-4: 8000 generated 6256 in 0.0557379722595 secs (PI = 3.128)`
这里发生了一些事情。首先,您实际上并没有缓存 RDD。来自你的 Github link:
# Now persist the intermediate result
sc.parallelize(xrange(1, n + 1), partitions).map(f).persist()
这将创建一个新的 RDD,进行映射,然后持久化生成的 RDD。您没有保留对它的引用,所以它现在实际上已经消失了。
接下来,第一个 运行 可能较慢,因为 Spark 会将您的函数广播给您的工作人员。所以有一些缓存在工作,但不是针对数据而是针对您的代码。
最后,随机性:seed()
在驱动程序中播种 RNG。种子值与 f()
一起在第一个 运行 所有 worker 上广播(因为种子在 random()
中被引用)。当您现在再次调用 seed()
时,它会更改驱动程序中的种子,但不会更改已发送给工作人员的函数版本中的种子,因此您会一遍又一遍地得到相同的结果。
作为 Spark 的新手,我一直在关注他们的python example for estimation of PI。
我有兴趣通过在同一上下文中多次重新估计 PI 来了解 Spark 的性能。
我观察到的是,在这些重新估计中 PI 的值没有变化,并且性能计时似乎表明中间 RDD 正在被隐式缓存,然后在后续计算中重用。
有没有什么方法可以配置 Spark 来控制这种行为,并且中间 RDD 总是重新生成?使用 unpersist() 似乎没有效果。
我的代码是 github 上的 here,通过调用
执行`spark-submit pi2.py`
得到如下结果:
No caching-0: 8000 generated 6256 in 1.14984297752 secs (PI = 3.128)
No caching-1: 8000 generated 6256 in 0.0597329139709 secs (PI = 3.128)
No caching-2: 8000 generated 6256 in 0.0577840805054 secs (PI = 3.128)
No caching-3: 8000 generated 6256 in 0.0545349121094 secs (PI = 3.128)
No caching-4: 8000 generated 6256 in 0.0544559955597 secs (PI = 3.128)
With caching-0: 8000 generated 6256 in 0.069139957428 secs (PI = 3.128)
With caching-1: 8000 generated 6256 in 0.0549170970917 secs (PI = 3.128)
With caching-2: 8000 generated 6256 in 0.0531771183014 secs (PI = 3.128)
With caching-3: 8000 generated 6256 in 0.0502359867096 secs (PI = 3.128)
With caching-4: 8000 generated 6256 in 0.0557379722595 secs (PI = 3.128)`
这里发生了一些事情。首先,您实际上并没有缓存 RDD。来自你的 Github link:
# Now persist the intermediate result
sc.parallelize(xrange(1, n + 1), partitions).map(f).persist()
这将创建一个新的 RDD,进行映射,然后持久化生成的 RDD。您没有保留对它的引用,所以它现在实际上已经消失了。
接下来,第一个 运行 可能较慢,因为 Spark 会将您的函数广播给您的工作人员。所以有一些缓存在工作,但不是针对数据而是针对您的代码。
最后,随机性:seed()
在驱动程序中播种 RNG。种子值与 f()
一起在第一个 运行 所有 worker 上广播(因为种子在 random()
中被引用)。当您现在再次调用 seed()
时,它会更改驱动程序中的种子,但不会更改已发送给工作人员的函数版本中的种子,因此您会一遍又一遍地得到相同的结果。