如何使用 Spark 的映射转换在 Scala 中 return 多个键值对?
How do I return multiple key-value pairs in Scala using Spark's map transformation?
我是 Scala 和 Spark 的新手。我正在尝试 return 映射转换期间的多个键值对。我的输入数据是一个简单的 CSV 文件。
1, 2, 3
4, 5, 6
7, 8, 9
我的 Scala 脚本如下所示。
class Key(_i:Integer, _j:Integer) {
def i = _i
def j = _j
}
class Val(_x:Double, _y:Double) {
def x = _x
def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- 0 until arr.length) {
val y = arr(j).toDouble
val k = new Key(i, j)
val v = new Val(x, y)
//note that i want to return the tuples, (k, v)
}
}
我希望能够将上面的 for 循环和数据结构用于 return 多个元组 (k, v)。类似于下面的代码。
val file = sc.textFile("/path/to/test.csv")
file.map(line => {
val arr = line.split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) {
val y = arr(j).toDouble
val k = new Index(i,j)
val v = new Val(x,y)
(k,v)
}
}
}).collect //reduceByKey is not there, reduce is there, but not what i want
当我将上面的代码 copy/paste 放入 lambda 表达式(以及 Scala REPL shell 上的 运行 )时,我收到以下错误:
error: illegal start of simple expression
val arr = line.split(",")
^
我也意识到我仍然停留在 imperative/procedural 风格的编程思维中,所以请多多包涵(和 Scala/Spark 的新手)。
您忘记了箭头后面的括号。如果是简单表达式(一个表达式),则只能省略它们。
file.map(line => {
//multiple lines of code here
})
编辑后的完整答案:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
实际上有很多问题:
- 请注意,我将您的 类 更改为大小写 类,因为它们是可序列化的。否则,您将需要实施
Serializable
- 我将
map
更改为 flatMap
,并且 flatten
编辑了你的数组,因为 flatMap
仍然会给你留下一个内部数组。现在,两者的组合将为您提供 RDD[(Index, Val)]
,现在可以隐式地与 reduceByKey
一起使用
- 我使用
yield
将你的 for
循环变成了 for
理解。您得到的是 Unit
的最终类型,因为 for
循环的 return 类型是 Unit
使用 RDD.flatMap
和 yield
来自 for
循环的列表:
val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
val arr = line.split(",")
for {
i <- 0 until arr.length
j <- (i + 1) until arr.length
} yield {
val x = arr(i).toDouble
val y = arr(j).toDouble
val k = new Index(i, j)
val v = new Val(x, y)
(k, v)
}
}.collect
我是 Scala 和 Spark 的新手。我正在尝试 return 映射转换期间的多个键值对。我的输入数据是一个简单的 CSV 文件。
1, 2, 3 4, 5, 6 7, 8, 9
我的 Scala 脚本如下所示。
class Key(_i:Integer, _j:Integer) {
def i = _i
def j = _j
}
class Val(_x:Double, _y:Double) {
def x = _x
def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- 0 until arr.length) {
val y = arr(j).toDouble
val k = new Key(i, j)
val v = new Val(x, y)
//note that i want to return the tuples, (k, v)
}
}
我希望能够将上面的 for 循环和数据结构用于 return 多个元组 (k, v)。类似于下面的代码。
val file = sc.textFile("/path/to/test.csv")
file.map(line => {
val arr = line.split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) {
val y = arr(j).toDouble
val k = new Index(i,j)
val v = new Val(x,y)
(k,v)
}
}
}).collect //reduceByKey is not there, reduce is there, but not what i want
当我将上面的代码 copy/paste 放入 lambda 表达式(以及 Scala REPL shell 上的 运行 )时,我收到以下错误:
error: illegal start of simple expression val arr = line.split(",") ^
我也意识到我仍然停留在 imperative/procedural 风格的编程思维中,所以请多多包涵(和 Scala/Spark 的新手)。
您忘记了箭头后面的括号。如果是简单表达式(一个表达式),则只能省略它们。
file.map(line => {
//multiple lines of code here
})
编辑后的完整答案:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
实际上有很多问题:
- 请注意,我将您的 类 更改为大小写 类,因为它们是可序列化的。否则,您将需要实施
Serializable
- 我将
map
更改为flatMap
,并且flatten
编辑了你的数组,因为flatMap
仍然会给你留下一个内部数组。现在,两者的组合将为您提供RDD[(Index, Val)]
,现在可以隐式地与reduceByKey
一起使用
- 我使用
yield
将你的for
循环变成了for
理解。您得到的是Unit
的最终类型,因为for
循环的 return 类型是Unit
使用 RDD.flatMap
和 yield
来自 for
循环的列表:
val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
val arr = line.split(",")
for {
i <- 0 until arr.length
j <- (i + 1) until arr.length
} yield {
val x = arr(i).toDouble
val y = arr(j).toDouble
val k = new Index(i, j)
val v = new Val(x, y)
(k, v)
}
}.collect