Apache Spark 中的案例 class 相等
Case class equality in Apache Spark
为什么 Spark 中的模式匹配与 Scala 中的模式匹配不同?请参见下面的示例...函数 f()
尝试在 class 上进行模式匹配,这在 Scala REPL 中有效但在 Spark 中失败并导致所有“???”。 f2()
是一种使用 .isInstanceOf()
在 Spark 中获得所需结果的解决方法,但我知道这在 Scala 中是错误的形式。
对于在这种情况下在 Spark 中以正确方式进行模式匹配的任何帮助,我们将不胜感激。
abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a
case class bNull(a: Int=0) extends a
val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)
尝试在 Scala REPL 中工作但在 Spark 中失败的模式匹配
def f(x: a) = x match {
case b(n) => "b"
case bNull(n) => "bnull"
case _ => "???"
}
在 Spark 中起作用的解决方法,但形式不佳(我认为)
def f2(x: a) = {
if (x.isInstanceOf[b]) {
"b"
} else if (x.isInstanceOf[bNull]) {
"bnull"
} else {
"???"
}
}
查看结果
xRdd.map(f).collect //does not work in Spark
// result: Array("???", "???", "???")
xRdd.map(f2).collect // works in Spark
// resut: Array("b", "b", "bnull")
x.map(f(_)) // works in Scala REPL
// result: List("b", "b", "bnull")
使用的版本...
Spark 结果 运行 in spark-shell(AWS EMR-4.3 上的 Spark 1.6)
SBT 0.13.9 中的 Scala REPL (Scala 2.10.5)
这是 Spark REPL 的一个已知问题。您可以在 SPARK-2620 中找到更多详细信息。它会影响 Spark REPL 中的多个操作,包括 PairwiseRDDs
上的大部分转换。例如:
case class Foo(x: Int)
val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2))
foos.distinct.size
// Int = 2
val foosRdd = sc.parallelize(foos, 4)
foosRdd.distinct.count
// Long = 4
foosRdd.map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1))
foosRdd.first == foos.head
// Boolean = false
Foo.unapply(foosRdd.first) == Foo.unapply(foos.head)
// Boolean = true
更糟糕的是结果取决于数据分布:
sc.parallelize(foos, 1).distinct.count
// Long = 2
sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2))
您可以做的最简单的事情是在 REPL 之外定义和打包所需的案例 类。任何直接使用 spark-submit
提交的代码也应该有效。
在 Scala 2.11+ 中,您可以使用 paste -raw
直接在 REPL 中创建包。
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package bar
case class Bar(x: Int)
// Exiting paste mode, now interpreting.
scala> import bar.Bar
import bar.Bar
scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect
res1: Array[bar.Bar] = Array(Bar(1), Bar(2))
为什么 Spark 中的模式匹配与 Scala 中的模式匹配不同?请参见下面的示例...函数 f()
尝试在 class 上进行模式匹配,这在 Scala REPL 中有效但在 Spark 中失败并导致所有“???”。 f2()
是一种使用 .isInstanceOf()
在 Spark 中获得所需结果的解决方法,但我知道这在 Scala 中是错误的形式。
对于在这种情况下在 Spark 中以正确方式进行模式匹配的任何帮助,我们将不胜感激。
abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a
case class bNull(a: Int=0) extends a
val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)
尝试在 Scala REPL 中工作但在 Spark 中失败的模式匹配
def f(x: a) = x match {
case b(n) => "b"
case bNull(n) => "bnull"
case _ => "???"
}
在 Spark 中起作用的解决方法,但形式不佳(我认为)
def f2(x: a) = {
if (x.isInstanceOf[b]) {
"b"
} else if (x.isInstanceOf[bNull]) {
"bnull"
} else {
"???"
}
}
查看结果
xRdd.map(f).collect //does not work in Spark
// result: Array("???", "???", "???")
xRdd.map(f2).collect // works in Spark
// resut: Array("b", "b", "bnull")
x.map(f(_)) // works in Scala REPL
// result: List("b", "b", "bnull")
使用的版本... Spark 结果 运行 in spark-shell(AWS EMR-4.3 上的 Spark 1.6) SBT 0.13.9 中的 Scala REPL (Scala 2.10.5)
这是 Spark REPL 的一个已知问题。您可以在 SPARK-2620 中找到更多详细信息。它会影响 Spark REPL 中的多个操作,包括 PairwiseRDDs
上的大部分转换。例如:
case class Foo(x: Int)
val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2))
foos.distinct.size
// Int = 2
val foosRdd = sc.parallelize(foos, 4)
foosRdd.distinct.count
// Long = 4
foosRdd.map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1))
foosRdd.first == foos.head
// Boolean = false
Foo.unapply(foosRdd.first) == Foo.unapply(foos.head)
// Boolean = true
更糟糕的是结果取决于数据分布:
sc.parallelize(foos, 1).distinct.count
// Long = 2
sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2))
您可以做的最简单的事情是在 REPL 之外定义和打包所需的案例 类。任何直接使用 spark-submit
提交的代码也应该有效。
在 Scala 2.11+ 中,您可以使用 paste -raw
直接在 REPL 中创建包。
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package bar
case class Bar(x: Int)
// Exiting paste mode, now interpreting.
scala> import bar.Bar
import bar.Bar
scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect
res1: Array[bar.Bar] = Array(Bar(1), Bar(2))