为 Spark 序列化 Scalaz 订单
Serializing Scalaz Order for Spark
我注意到大多数 Scalaz classes 是不可序列化的。在这种情况下,我尝试使用 type class 在 Spark 中对数组进行自定义排序。
一个减少的例子可能是这样的:
> val ord = Order[T]{ ... }
> sc.makeRDD[T](...).grupBy(...).map {
case (_, grouped) => IList[T](grouped.toList).sorted(ord).distinct(ord)
}
如您所料,此实现抛出 NotSerializableException
因为 Order[T]
不可序列化。
有什么方法可以使 Order[T]
可序列化吗?在一个完美的世界中,我希望仍然使用 scalaz 来避免这个问题。在 不太完美 中,我愿意考虑其他实现方式。
如果发生这种情况,则必须以可维护和可扩展的方式保持自定义排序和不同的实现。
如果您需要访问某些不可序列化的对象,您可以将其包装在 object
:
scala> class NotSerializablePrinter { def print(msg:String) = println(msg) }
defined class NotSerializablePrinter
scala> val printer = new NotSerializablePrinter
printer: NotSerializablePrinter = $iwC$$iwC$NotSerializablePrinter@3b8afdbf
scala> val rdd = sc.parallelize(Array("1","2","3"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:30
scala> rdd.foreach(msg => printer.print(msg)) // Fails
org.apache.spark.SparkException: Task not serializable
...
scala> object wrap { val printer = new NotSerializablePrinter }
defined module wrap
scala> rdd.foreach(msg => wrap.printer.print(msg))
1
3
2
在您的情况下,您可以将我的 NotSerializablePrinter
实例替换为您的 Scalaz Order
实例。此示例摘自 this useful article(item 3a).
我注意到大多数 Scalaz classes 是不可序列化的。在这种情况下,我尝试使用 type class 在 Spark 中对数组进行自定义排序。
一个减少的例子可能是这样的:
> val ord = Order[T]{ ... }
> sc.makeRDD[T](...).grupBy(...).map {
case (_, grouped) => IList[T](grouped.toList).sorted(ord).distinct(ord)
}
如您所料,此实现抛出 NotSerializableException
因为 Order[T]
不可序列化。
有什么方法可以使 Order[T]
可序列化吗?在一个完美的世界中,我希望仍然使用 scalaz 来避免这个问题。在 不太完美 中,我愿意考虑其他实现方式。
如果发生这种情况,则必须以可维护和可扩展的方式保持自定义排序和不同的实现。
如果您需要访问某些不可序列化的对象,您可以将其包装在 object
:
scala> class NotSerializablePrinter { def print(msg:String) = println(msg) }
defined class NotSerializablePrinter
scala> val printer = new NotSerializablePrinter
printer: NotSerializablePrinter = $iwC$$iwC$NotSerializablePrinter@3b8afdbf
scala> val rdd = sc.parallelize(Array("1","2","3"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:30
scala> rdd.foreach(msg => printer.print(msg)) // Fails
org.apache.spark.SparkException: Task not serializable
...
scala> object wrap { val printer = new NotSerializablePrinter }
defined module wrap
scala> rdd.foreach(msg => wrap.printer.print(msg))
1
3
2
在您的情况下,您可以将我的 NotSerializablePrinter
实例替换为您的 Scalaz Order
实例。此示例摘自 this useful article(item 3a).