如何在 Spark 中进行二次排序?

How to perform Secondary Sort in Spark?

我正在使用 Spark 搜索辅助排序并找到了这个解决方案:

case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
    override def numPartitions: Int = partitions
    override def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[RFMCKey]
    k.cId.hashCode() % numPartitions
    }
}
object RFMCKey {
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
    }
}

现在这是我用于我的 RFMC(新近度、频率、货币、聚集性)程序的代码。 在相同的代码中,最后,我在做:

val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))

但是当我在 spark-shell 中加载此文件时,出现以下错误:

<console>:130: error: RFMCKey is already defined as (compiler-generated) case class companion object RFMCKey
            object RFMCKey {
                       ^
<console>:198: error: RFMCKey.type does not take parameters
                                case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
                                                                                                              ^
<console>:200: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Nothing]
val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)).cache()

如何避免这个问题?

更新 1

我尝试更改案例 class 和对象 class 的声明顺序,令人惊讶的是 shell 加载文件时没有抛出任何错误。但是当我 运行 我的程序时,它抛出了一个新错误:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$.constructRFMC(<console>:113)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$
Serialization stack:
    - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$@757fc606)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 52 more

更新 2

我定义对象和函数的方式是这样的:

object rfmc {
  def constructrfmc() = {
    // Everything goes inside including the custom key and partitioner
    // code defined above
  }
}

更新 3

我在 eclipse 中定义代码的方式非常有效:

object rfmc extends App {
  // Everything goes inside including the custom key and partitioner
  // code defined above
}

我还为此代码创建了一个 JAR,并且 运行 使用 spark-submit 并且效果也很好。

要解决 RFMCKey 已定义的问题,您需要交换案例 class 和对象声明的顺序,如 this issue 中所述。

关于您的更新,spark-shell 中可能存在一些限制,无法执行任何任意代码(例如使用累加器)。要更深入地了解序列化机制,您应该传递以下选项 -Dsun.io.serialization.extendedDebugInfo=true。请记住,由于 REPL,spark-shell 更像是一个探索性实用程序,用于迭代测试一小部分代码或新功能,而不是一个应该广泛用于测试代码的成熟的生产就绪实用程序.

这里最安全的选择是将您的应用程序打包到一个 jar 中,然后在 standalone mode 中设置 Spark,然后 运行 spark-submit 使用您打包的 jar。正如您 post 的更新 3 和更新 4 中所反映的,您需要更新代码以将其包装到一个对象中,使其成为您作业的入口点。这将使您能够确保您的代码在这里没有错误。