Spark 不同的操作不适用于自定义类型

Spark distinct operation does not work on custom type

我正在使用 Scala 中的 Spark,我正在尝试 distinct 一个内部包含自定义对象元素的 RDD。

distinct操作没有找到,"delete"重复,操作后的RDD还是和之前一样。 我不明白为什么会这样。在我尝试在原始类型的 RDD 上执行 distinct 之后,我得到了一个提示(并且有效)。所以我尝试在我的自定义对象中扩展 Ordered 但没有发生新的事情。

按照我的测试代码。

class AA (val a: Int, val b: Int) extends Ordered[AA] {

    override def toString = "("+a+","+b+")"

    override def equals(t : Any) = t match {
       case that: AA => this.a == that.a & this.b == that.b
       case _ => super.equals(t)
    }

    override def compare(that: AA): Int = { this.b compare that.b }
}

distinct操作在这里:

val par = sc.parallelize[AA](List(new AA(1,2), new AA(2,3), new AA(2,1), new AA(3,2), new AA(3,2)))

println("Count Before: "+par.count())                 // 5
println("Count After: "+par.distinct().count())       // still 5

我什至尝试使用不同的执行流程来计算 distinct,使用 reduceByKey()aggregateByKey() 但也不走运。

这里发生了什么?

提前谢谢大家。

您还需要提供hashCode

override def hashCode = (a, b).hashCode

或者,您可以使用保护套 class

免费获得 equalshashCode

您可以使用 case class,因为它随 hashCodeequals 一起提供,因此 distinct 将立即生效:

case class AA (val a: Int, val b: Int) {
  override def toString = "("+a+","+b+")"
}

请注意,它可能会在 Spark REPL 中出现错误:https://issues.apache.org/jira/browse/SPARK-2620 但它会在 实际上 执行您的作业时起作用。

当我 运行 你的代码时,我得到 java.io.NotSerializableException

您不需要实现 Ordering 并覆盖这些方法,只需使用案例 类:

import org.apache.spark.{SparkContext, SparkConf}

case class AA (val a: Int, val b: Int)

val conf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(conf)

val par = sc.parallelize[AA](List(AA(1, 2), AA(2, 3), AA(2, 1), AA(3, 2), AA(3, 2)))

println("Count Before: " + par.count()) // 5
println("Count After: " + par.distinct().count()) // 4