Apache Spark 的 RDD[Vector] 不变性问题
Apache Spark's RDD[Vector] Immutability issue
我知道 RDD 是不可变的,因此它们的值无法更改,但我看到以下行为:
我为 FuzzyCMeans (https://github.com/salexln/FinalProject_FCM) 算法编写了一个实现,现在我正在测试它,所以我 运行 下面的例子:
import org.apache.spark.mllib.clustering.FuzzyCMeans
import org.apache.spark.mllib.linalg.Vectors
val data = sc.textFile("/home/development/myPrjects/R/butterfly/butterfly.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
> parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31
val numClusters = 2
val numIterations = 20
parsedData.foreach{ point => println(point) }
> [0.0,-8.0]
[-3.0,-2.0]
[-3.0,0.0]
[-3.0,2.0]
[-2.0,-1.0]
[-2.0,0.0]
[-2.0,1.0]
[-1.0,0.0]
[0.0,0.0]
[1.0,0.0]
[2.0,-1.0]
[2.0,0.0]
[2.0,1.0]
[3.0,-2.0]
[3.0,0.0]
[3.0,2.0]
[0.0,8.0]
val clusters = FuzzyCMeans.train(parsedData, numClusters, numIteration
parsedData.foreach{ point => println(point) }
>
[0.0,-0.4803333185624595]
[-0.1811743096972924,-0.12078287313152826]
[-0.06638890786148487,0.0]
[-0.04005925925925929,0.02670617283950619]
[-0.12193263222069807,-0.060966316110349035]
[-0.0512,0.0]
[NaN,NaN]
[-0.049382716049382706,0.0]
[NaN,NaN]
[0.006830134553650707,0.0]
[0.05120000000000002,-0.02560000000000001]
[0.04755220304297078,0.0]
[0.06581619798335057,0.03290809899167529]
[0.12010867103812725,-0.0800724473587515]
[0.10946638900458144,0.0]
[0.14814814814814817,0.09876543209876545]
[0.0,0.49119985188436205]
但是我的方法怎么会改变不可变的RDD呢?
顺便说一句,训练方法的签名如下:
train(数据:RDD[Vector],集群:Int,maxIterations:Int)
中准确描述了您所做的事情
Printing elements of an RDD
Another common idiom is attempting to print out the elements of an RDD
using rdd.foreach(println) or rdd.map(println). On a single machine,
this will generate the expected output and print all the RDD’s
elements. However, in cluster mode, the output to stdout being called
by the executors is now writing to the executor’s stdout instead, not
the one on the driver, so stdout on the driver won’t show these! To
print all elements on the driver, one can use the collect() method to
first bring the RDD to the driver node thus:
rdd.collect().foreach(println). This can cause the driver to run out
of memory, though, because collect() fetches the entire RDD to a
single machine; if you only need to print a few elements of the RDD, a
safer approach is to use the take(): rdd.take(100).foreach(println).
因此,由于数据可以在节点之间迁移,因此无法保证 foreach
的输出相同。 RDD 是不可变的,但您应该以适当的方式提取数据,因为您的节点上没有整个 RDD。
另一个可能的问题(不是你的情况,因为你使用的是不可变向量)是在 Point
自己内部使用可变数据,这是完全不正确的,所以你会失去所有保证 - [=然而,12=] 本身仍然是不可变的。
为了让 RDD 完全不可变,它的内容也应该是不可变的:
scala> val m = Array.fill(2, 2)(0)
m: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))
scala> val rdd = sc.parallelize(m)
rdd: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[1]
at parallelize at <console>:23
scala> rdd.collect()
res6: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))
scala> m(0)(1) = 2
scala> rdd.collect()
res8: Array[Array[Int]] = Array(Array(0, 2), Array(0, 0))
因为数组是可变的,所以我可以更改它,因此 RDD 已用新数据更新
我知道 RDD 是不可变的,因此它们的值无法更改,但我看到以下行为:
我为 FuzzyCMeans (https://github.com/salexln/FinalProject_FCM) 算法编写了一个实现,现在我正在测试它,所以我 运行 下面的例子:
import org.apache.spark.mllib.clustering.FuzzyCMeans
import org.apache.spark.mllib.linalg.Vectors
val data = sc.textFile("/home/development/myPrjects/R/butterfly/butterfly.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
> parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31
val numClusters = 2
val numIterations = 20
parsedData.foreach{ point => println(point) }
> [0.0,-8.0]
[-3.0,-2.0]
[-3.0,0.0]
[-3.0,2.0]
[-2.0,-1.0]
[-2.0,0.0]
[-2.0,1.0]
[-1.0,0.0]
[0.0,0.0]
[1.0,0.0]
[2.0,-1.0]
[2.0,0.0]
[2.0,1.0]
[3.0,-2.0]
[3.0,0.0]
[3.0,2.0]
[0.0,8.0]
val clusters = FuzzyCMeans.train(parsedData, numClusters, numIteration
parsedData.foreach{ point => println(point) }
>
[0.0,-0.4803333185624595]
[-0.1811743096972924,-0.12078287313152826]
[-0.06638890786148487,0.0]
[-0.04005925925925929,0.02670617283950619]
[-0.12193263222069807,-0.060966316110349035]
[-0.0512,0.0]
[NaN,NaN]
[-0.049382716049382706,0.0]
[NaN,NaN]
[0.006830134553650707,0.0]
[0.05120000000000002,-0.02560000000000001]
[0.04755220304297078,0.0]
[0.06581619798335057,0.03290809899167529]
[0.12010867103812725,-0.0800724473587515]
[0.10946638900458144,0.0]
[0.14814814814814817,0.09876543209876545]
[0.0,0.49119985188436205]
但是我的方法怎么会改变不可变的RDD呢?
顺便说一句,训练方法的签名如下:
train(数据:RDD[Vector],集群:Int,maxIterations:Int)
Printing elements of an RDD
Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).
因此,由于数据可以在节点之间迁移,因此无法保证 foreach
的输出相同。 RDD 是不可变的,但您应该以适当的方式提取数据,因为您的节点上没有整个 RDD。
另一个可能的问题(不是你的情况,因为你使用的是不可变向量)是在 Point
自己内部使用可变数据,这是完全不正确的,所以你会失去所有保证 - [=然而,12=] 本身仍然是不可变的。
为了让 RDD 完全不可变,它的内容也应该是不可变的:
scala> val m = Array.fill(2, 2)(0)
m: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))
scala> val rdd = sc.parallelize(m)
rdd: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[1]
at parallelize at <console>:23
scala> rdd.collect()
res6: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))
scala> m(0)(1) = 2
scala> rdd.collect()
res8: Array[Array[Int]] = Array(Array(0, 2), Array(0, 0))
因为数组是可变的,所以我可以更改它,因此 RDD 已用新数据更新