如何在 flatMap Scala-Spark 中为 breeze 矩阵赋值?
How to assign value into a breeze Matrix in flatMap Scala-Spark?
我想用 flatMap 中的数据初始化一个矩阵,这是我的数据:
-4,0,1.0 ### horrible . not-work install dozen scanner umax ofcourse . tech-support everytime call . fresh install work error . crummy product crummy tech-support crummy experience .
2,1,1.0 ### scanner run . grant product run windows . live fact driver windows lose performance . setup program alert support promptly quits . amazon . website product package requirement listing compatible windows .
1,2,1.0 ### conversion kit spare battery total better stick versionand radio blow nimh charger battery . combination operation size nimh battery . motorola kit . rechargable battery available flashlight camera game toy .
-4,3,1.0 ### recieive part autowinder catch keep place sudden break . hold listen music winder wind . extremely frustrated fix pull little hard snap half . flush drain .
这是我的代码:
val spark_context = new SparkContext(conf)
val data = spark_context.textFile(Input)
val Gama=DenseMatrix.zeros[Double](4,2)
var gmmainit = data.flatMap(line => {
val tuple = line.split("###")
val ss = tuple(0)
val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r
val re(n1, n2, n3) = ss // pattern match and extract values
if (n1.toInt >= 0) {
Gama(n2.toInt, 0) += 1
}
if (n1.toInt < 0) {
Gama(n2.toInt, 1) += 1
}
})
println(Gama)
但它不会改变 Gama 矩阵,
我如何修改我的代码来解决这个问题?
您不能修改分布式函数中的变量。嗯,你可以,但变量只在那个过程中被修改。请记住,火花是分布式的。因此,您需要 return 一个可以展平的值(我不太了解 DenseMatrix,无法在此处说出确切的需求)。不过,如果它可以关联和交换,您也许可以创建一个自定义累加器来完成此操作。
首先,您的代码甚至无法编译。如果你看一下 flatMap
签名:
flatMap[U](f: T => TraversableOnce[U])
您会看到它从 T
映射到 TraversableOnce[U]
。由于 DenseMatrix
returns Unit
函数的 update
方法是 String => Unit
类型,而 Unit
不是 TraversableOnce
.
此外,正如 Justin 所解释的,每个分区都有自己的闭包中引用的变量的本地副本,并且只有该副本被修改。
解决这个问题的一种方法是这样的:
val gmmainit = data.mapPartitions(iter => {
val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r
val gama = DenseMatrix.zeros[Double](4,2)
iter.foreach{
case re(n1, n2, n3) => gama(n2.toInt, if(n1.toInt >= 0) 0 else 1) += 1
case _ =>
}
Iterator(gama)
}).reduce(_ + _)
我想用 flatMap 中的数据初始化一个矩阵,这是我的数据:
-4,0,1.0 ### horrible . not-work install dozen scanner umax ofcourse . tech-support everytime call . fresh install work error . crummy product crummy tech-support crummy experience .
2,1,1.0 ### scanner run . grant product run windows . live fact driver windows lose performance . setup program alert support promptly quits . amazon . website product package requirement listing compatible windows .
1,2,1.0 ### conversion kit spare battery total better stick versionand radio blow nimh charger battery . combination operation size nimh battery . motorola kit . rechargable battery available flashlight camera game toy .
-4,3,1.0 ### recieive part autowinder catch keep place sudden break . hold listen music winder wind . extremely frustrated fix pull little hard snap half . flush drain .
这是我的代码:
val spark_context = new SparkContext(conf)
val data = spark_context.textFile(Input)
val Gama=DenseMatrix.zeros[Double](4,2)
var gmmainit = data.flatMap(line => {
val tuple = line.split("###")
val ss = tuple(0)
val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r
val re(n1, n2, n3) = ss // pattern match and extract values
if (n1.toInt >= 0) {
Gama(n2.toInt, 0) += 1
}
if (n1.toInt < 0) {
Gama(n2.toInt, 1) += 1
}
})
println(Gama)
但它不会改变 Gama 矩阵,
我如何修改我的代码来解决这个问题?
您不能修改分布式函数中的变量。嗯,你可以,但变量只在那个过程中被修改。请记住,火花是分布式的。因此,您需要 return 一个可以展平的值(我不太了解 DenseMatrix,无法在此处说出确切的需求)。不过,如果它可以关联和交换,您也许可以创建一个自定义累加器来完成此操作。
首先,您的代码甚至无法编译。如果你看一下 flatMap
签名:
flatMap[U](f: T => TraversableOnce[U])
您会看到它从 T
映射到 TraversableOnce[U]
。由于 DenseMatrix
returns Unit
函数的 update
方法是 String => Unit
类型,而 Unit
不是 TraversableOnce
.
此外,正如 Justin 所解释的,每个分区都有自己的闭包中引用的变量的本地副本,并且只有该副本被修改。
解决这个问题的一种方法是这样的:
val gmmainit = data.mapPartitions(iter => {
val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r
val gama = DenseMatrix.zeros[Double](4,2)
iter.foreach{
case re(n1, n2, n3) => gama(n2.toInt, if(n1.toInt >= 0) 0 else 1) += 1
case _ =>
}
Iterator(gama)
}).reduce(_ + _)