使用 clojure/flambo 在 spark 中进行二次排序
Secondar Sorting in spark using clojure/flambo
我有一个 scala 程序,我在其中实现了完美运行的辅助排序。我编写该程序的方式是:
object rfmc {
// Custom Key and partitioner
case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
// The body of the code
//
//
val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
}
我想使用 clojure 的 DSL for spark flambo 来实现同样的事情。由于我无法使用 clojure 编写分区程序,因此我重新使用了上面定义的代码,对其进行编译并将其用作我的 Clojure 代码中的依赖项。
现在我通过以下方式在我的 clojure 代码中导入分区程序和密钥:
(ns xyz
(:import
[package RFMCPartitioner]
[package RFMCKey]
)
)
但是当我尝试通过 (RFMCKey. cust_id r f m c)
创建 RFMCKey
时,它会抛出以下错误:
java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon.compare(Ordering.scala:153)
at org.apache.spark.util.collection.ExternalSorter$$anon.compare(ExternalSorter.scala:170)
at org.apache.spark.util.collection.ExternalSorter$$anon.compare(ExternalSorter.scala:164)
at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我的猜测是它无法找到我在分区程序之后定义的顺序。但如果它在 Scala 中有效,为什么它在 Clojure 中不起作用?
所以我终于自己弄明白了。我不得不基本上将我的自定义排序函数编写为一个单独的 scala 项目,然后在 clojure 中调用它。
我的 scala 文件是这样写的:
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
class rfmcSort {
def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = {
val x = a.map(v => v match {
case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
}).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
x
}
}
我将它编译为 ascala 项目并以这种方式在我的 clojure 代码中使用它:
(:import [org.formcept.wisdom rfmcSort]
[org.apache.spark.rdd.RDD])
sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions))
请注意我从我创建的 rfmcSort
对象调用 sortWithRFMC
函数的方式。这里还有一件非常重要的事情要注意,当你将你的 JavaPairRDD
传递给你的 scala 函数时,你必须首先通过调用它的 .rdd
方法将它转换成一个普通的 spark RDD
。然后你必须将 spark RDD
转换回 JavaPairRDD
才能在 clojure 中使用它。
我有一个 scala 程序,我在其中实现了完美运行的辅助排序。我编写该程序的方式是:
object rfmc {
// Custom Key and partitioner
case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
// The body of the code
//
//
val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
}
我想使用 clojure 的 DSL for spark flambo 来实现同样的事情。由于我无法使用 clojure 编写分区程序,因此我重新使用了上面定义的代码,对其进行编译并将其用作我的 Clojure 代码中的依赖项。
现在我通过以下方式在我的 clojure 代码中导入分区程序和密钥:
(ns xyz
(:import
[package RFMCPartitioner]
[package RFMCKey]
)
)
但是当我尝试通过 (RFMCKey. cust_id r f m c)
创建 RFMCKey
时,它会抛出以下错误:
java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon.compare(Ordering.scala:153)
at org.apache.spark.util.collection.ExternalSorter$$anon.compare(ExternalSorter.scala:170)
at org.apache.spark.util.collection.ExternalSorter$$anon.compare(ExternalSorter.scala:164)
at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我的猜测是它无法找到我在分区程序之后定义的顺序。但如果它在 Scala 中有效,为什么它在 Clojure 中不起作用?
所以我终于自己弄明白了。我不得不基本上将我的自定义排序函数编写为一个单独的 scala 项目,然后在 clojure 中调用它。
我的 scala 文件是这样写的:
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
class rfmcSort {
def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = {
val x = a.map(v => v match {
case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
}).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
x
}
}
我将它编译为 ascala 项目并以这种方式在我的 clojure 代码中使用它:
(:import [org.formcept.wisdom rfmcSort]
[org.apache.spark.rdd.RDD])
sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions))
请注意我从我创建的 rfmcSort
对象调用 sortWithRFMC
函数的方式。这里还有一件非常重要的事情要注意,当你将你的 JavaPairRDD
传递给你的 scala 函数时,你必须首先通过调用它的 .rdd
方法将它转换成一个普通的 spark RDD
。然后你必须将 spark RDD
转换回 JavaPairRDD
才能在 clojure 中使用它。