Spark 不同的操作不适用于自定义类型
Spark distinct operation does not work on custom type
我正在使用 Scala 中的 Spark,我正在尝试 distinct
一个内部包含自定义对象元素的 RDD。
distinct操作没有找到,"delete"重复,操作后的RDD还是和之前一样。
我不明白为什么会这样。在我尝试在原始类型的 RDD 上执行 distinct 之后,我得到了一个提示(并且有效)。所以我尝试在我的自定义对象中扩展 Ordered
但没有发生新的事情。
按照我的测试代码。
class AA (val a: Int, val b: Int) extends Ordered[AA] {
override def toString = "("+a+","+b+")"
override def equals(t : Any) = t match {
case that: AA => this.a == that.a & this.b == that.b
case _ => super.equals(t)
}
override def compare(that: AA): Int = { this.b compare that.b }
}
distinct
操作在这里:
val par = sc.parallelize[AA](List(new AA(1,2), new AA(2,3), new AA(2,1), new AA(3,2), new AA(3,2)))
println("Count Before: "+par.count()) // 5
println("Count After: "+par.distinct().count()) // still 5
我什至尝试使用不同的执行流程来计算 distinct,使用 reduceByKey()
或 aggregateByKey()
但也不走运。
这里发生了什么?
提前谢谢大家。
您还需要提供hashCode
override def hashCode = (a, b).hashCode
或者,您可以使用保护套 class
免费获得 equals
和 hashCode
您可以使用 case class
,因为它随 hashCode
和 equals
一起提供,因此 distinct
将立即生效:
case class AA (val a: Int, val b: Int) {
override def toString = "("+a+","+b+")"
}
请注意,它可能会在 Spark REPL 中出现错误:https://issues.apache.org/jira/browse/SPARK-2620 但它会在 实际上 执行您的作业时起作用。
当我 运行 你的代码时,我得到 java.io.NotSerializableException
。
您不需要实现 Ordering
并覆盖这些方法,只需使用案例 类:
import org.apache.spark.{SparkContext, SparkConf}
case class AA (val a: Int, val b: Int)
val conf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(conf)
val par = sc.parallelize[AA](List(AA(1, 2), AA(2, 3), AA(2, 1), AA(3, 2), AA(3, 2)))
println("Count Before: " + par.count()) // 5
println("Count After: " + par.distinct().count()) // 4
我正在使用 Scala 中的 Spark,我正在尝试 distinct
一个内部包含自定义对象元素的 RDD。
distinct操作没有找到,"delete"重复,操作后的RDD还是和之前一样。
我不明白为什么会这样。在我尝试在原始类型的 RDD 上执行 distinct 之后,我得到了一个提示(并且有效)。所以我尝试在我的自定义对象中扩展 Ordered
但没有发生新的事情。
按照我的测试代码。
class AA (val a: Int, val b: Int) extends Ordered[AA] {
override def toString = "("+a+","+b+")"
override def equals(t : Any) = t match {
case that: AA => this.a == that.a & this.b == that.b
case _ => super.equals(t)
}
override def compare(that: AA): Int = { this.b compare that.b }
}
distinct
操作在这里:
val par = sc.parallelize[AA](List(new AA(1,2), new AA(2,3), new AA(2,1), new AA(3,2), new AA(3,2)))
println("Count Before: "+par.count()) // 5
println("Count After: "+par.distinct().count()) // still 5
我什至尝试使用不同的执行流程来计算 distinct,使用 reduceByKey()
或 aggregateByKey()
但也不走运。
这里发生了什么?
提前谢谢大家。
您还需要提供hashCode
override def hashCode = (a, b).hashCode
或者,您可以使用保护套 class
免费获得equals
和 hashCode
您可以使用 case class
,因为它随 hashCode
和 equals
一起提供,因此 distinct
将立即生效:
case class AA (val a: Int, val b: Int) {
override def toString = "("+a+","+b+")"
}
请注意,它可能会在 Spark REPL 中出现错误:https://issues.apache.org/jira/browse/SPARK-2620 但它会在 实际上 执行您的作业时起作用。
当我 运行 你的代码时,我得到 java.io.NotSerializableException
。
您不需要实现 Ordering
并覆盖这些方法,只需使用案例 类:
import org.apache.spark.{SparkContext, SparkConf}
case class AA (val a: Int, val b: Int)
val conf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(conf)
val par = sc.parallelize[AA](List(AA(1, 2), AA(2, 3), AA(2, 1), AA(3, 2), AA(3, 2)))
println("Count Before: " + par.count()) // 5
println("Count After: " + par.distinct().count()) // 4