Spark Streaming:如何更改foreachRDD函数中外部变量的值?
Spark Streaming: How to change the value of external variables in foreachRDD function?
测试代码:
object MaxValue extends Serializable{
var max = 0
}
object Test {
def main(args: Array[String]): Unit = {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(5))
val seq = Seq("testData")
val rdd = ssc.sparkContext.parallelize(seq)
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.foreachRDD(rdd => { MaxValue.max = 10 }) //I change MaxValue.max value to 10.
val map = inputDStream.map(a => MaxValue.max)
map.print //Why the result is 0? Why not 10?
ssc.start
ssc.awaitTermination
}
}
这样的话,如何在foreachRDD()中改变MaxValue.max
的值呢? map.print
的结果是0,为什么不是10。我想在[=20中使用RDD.max() =]foreachRDD(),所以我需要在 foreachRDD().
中更改 MaxValue.max
值
你能帮帮我吗?谢谢!
这是不可能的。请记住,RDD 方法内部的操作是 运行 分布式的。因此,对 MaxValue.max
的更改只会在 worker 上执行,而不会在 driver 上执行。也许如果你说出你正在尝试做的事情可以帮助找到更好的解决方案,也许可以使用累加器?
一般来说,最好避免尝试以这种方式累加值,有不同的方法,如累加器或 updateStateByKey
可以正确地做到这一点。
为了更好地了解代码中发生的情况,假设您有 1 个驱动程序和分布在多个执行程序上的多个分区(最典型的场景)
在驱动程序上运行
inputDStream.foreachRDD(rdd => { MaxValue.max = 10 })
foreachRDD
运行s 内的代码块在 驱动程序 上,因此它更新驱动程序
上的对象 MaxValue
在执行器上运行
val map = inputDStream.map(a => MaxValue.max)
Will 运行 lambda 在每个 executor 上单独执行,因此将从 MaxValue 在执行器上获取值(之前从未更新过)。另请注意,每个执行程序都有自己版本的 MaxValue 对象,因为它们每个都位于单独的 JVM 进程中(通常也在集群中的单独节点上)。
当您将代码更改为
val map = inputDStream.map(a => {MaxValue.max=10; MaxValue.max})
您实际上是在执行者上更新 MaxValue,然后也在执行者上获取它 - 所以它有效。
这也应该有效:
val map = inputDStream.map(a => {MaxValue.max=10; a}).map(a => MaxValue.max)
但是,如果您执行以下操作:
val map = inputDStream.map(a => {MaxValue.max= new Random().nextInt(10); a}).map(a => MaxValue.max)
你应该得到一组包含 4 个不同整数的记录(每个分区将有不同的最大值)
意外结果
本地模式
避免的一个很好的理由是,根据情况,您可能会得到更不可预测的结果。例如,如果你的 运行 你的原始代码 returns 0 on cluster 它将 return 10 in local mode 在这种情况下,驱动程序和所有分区将存在于单个 JVM 进程中,并将共享该对象。所以你甚至可以在这样的代码上创建单元测试,感觉很安全但是当部署到集群时 - 开始出现问题。
作业调度顺序
对于这个我不是 100% 确定 - 试图在源代码中找到,但有可能会出现另一个问题。在您的代码中,您将有 2 个工作:
一个是基于你的输出
inputDStream.foreachRDD
另一个是基于map.print
的输出。尽管它们最初使用相同的流,但 Spark 将为它们生成两个独立的 DAG,并将安排两个独立的作业,这些作业可以完全独立地由 spark 处理,事实上 - 它甚至不必保证作业的执行顺序(它确实显然在一个工作中保证阶段的执行顺序),如果这在理论上发生,它可以 运行 在第一个之前的第二个工作使结果更难以预测
测试代码:
object MaxValue extends Serializable{
var max = 0
}
object Test {
def main(args: Array[String]): Unit = {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(5))
val seq = Seq("testData")
val rdd = ssc.sparkContext.parallelize(seq)
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.foreachRDD(rdd => { MaxValue.max = 10 }) //I change MaxValue.max value to 10.
val map = inputDStream.map(a => MaxValue.max)
map.print //Why the result is 0? Why not 10?
ssc.start
ssc.awaitTermination
}
}
这样的话,如何在foreachRDD()中改变MaxValue.max
的值呢? map.print
的结果是0,为什么不是10。我想在[=20中使用RDD.max() =]foreachRDD(),所以我需要在 foreachRDD().
MaxValue.max
值
你能帮帮我吗?谢谢!
这是不可能的。请记住,RDD 方法内部的操作是 运行 分布式的。因此,对 MaxValue.max
的更改只会在 worker 上执行,而不会在 driver 上执行。也许如果你说出你正在尝试做的事情可以帮助找到更好的解决方案,也许可以使用累加器?
一般来说,最好避免尝试以这种方式累加值,有不同的方法,如累加器或 updateStateByKey
可以正确地做到这一点。
为了更好地了解代码中发生的情况,假设您有 1 个驱动程序和分布在多个执行程序上的多个分区(最典型的场景)
在驱动程序上运行
inputDStream.foreachRDD(rdd => { MaxValue.max = 10 })
foreachRDD
运行s 内的代码块在 驱动程序 上,因此它更新驱动程序
在执行器上运行
val map = inputDStream.map(a => MaxValue.max)
Will 运行 lambda 在每个 executor 上单独执行,因此将从 MaxValue 在执行器上获取值(之前从未更新过)。另请注意,每个执行程序都有自己版本的 MaxValue 对象,因为它们每个都位于单独的 JVM 进程中(通常也在集群中的单独节点上)。
当您将代码更改为
val map = inputDStream.map(a => {MaxValue.max=10; MaxValue.max})
您实际上是在执行者上更新 MaxValue,然后也在执行者上获取它 - 所以它有效。
这也应该有效:
val map = inputDStream.map(a => {MaxValue.max=10; a}).map(a => MaxValue.max)
但是,如果您执行以下操作:
val map = inputDStream.map(a => {MaxValue.max= new Random().nextInt(10); a}).map(a => MaxValue.max)
你应该得到一组包含 4 个不同整数的记录(每个分区将有不同的最大值)
意外结果
本地模式
避免的一个很好的理由是,根据情况,您可能会得到更不可预测的结果。例如,如果你的 运行 你的原始代码 returns 0 on cluster 它将 return 10 in local mode 在这种情况下,驱动程序和所有分区将存在于单个 JVM 进程中,并将共享该对象。所以你甚至可以在这样的代码上创建单元测试,感觉很安全但是当部署到集群时 - 开始出现问题。
作业调度顺序
对于这个我不是 100% 确定 - 试图在源代码中找到,但有可能会出现另一个问题。在您的代码中,您将有 2 个工作:
一个是基于你的输出
inputDStream.foreachRDD
另一个是基于map.print
的输出。尽管它们最初使用相同的流,但 Spark 将为它们生成两个独立的 DAG,并将安排两个独立的作业,这些作业可以完全独立地由 spark 处理,事实上 - 它甚至不必保证作业的执行顺序(它确实显然在一个工作中保证阶段的执行顺序),如果这在理论上发生,它可以 运行 在第一个之前的第二个工作使结果更难以预测