Spark RDD 等同于 Scala 集合分区
Spark RDD equivalent to Scala collections partition
这是我的一项 spark 作业的一个小问题,它似乎不会引起任何问题 -- 但每次我看到它都会让我烦恼,并且无法想出更好的解决方案。
假设我有一个像这样的 Scala 集合:
val myStuff = List(Try(2/2), Try(2/0))
我可以使用分区将此列表分为成功和失败:
val (successes, failures) = myStuff.partition(_.isSuccess)
这很好。分区的实现只遍历一次源集合来构建两个新集合。但是,使用 Spark,我能够设计出的最佳等价物是:
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
除了解包 Try 的不同(这很好)还需要遍历数据两次。这很烦人。
是否有更好的 Spark 替代方案可以拆分 RDD 而无需多次遍历?即具有这样的签名,其中分区具有 Scala 集合分区而不是 RDD 分区的行为:
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)
作为参考,我以前使用类似下面的方法来解决这个问题。可能失败的操作是从二进制格式反序列化某些数据,并且失败变得非常有趣,需要对其进行处理并保存为 RDD 而不是记录。
def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
try {
Some(deserialize(data))
} catch {
case e: MyDesrializationError => {
logger.error(e)
None
}
}
}
可能还有其他解决方案,但这里是:
设置:
import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)
执行:
val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
(accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2),
(first, second)=> (first._1 ++ second._1,first._2 ++ second._2))
如果您需要解释,请告诉我
这是我的一项 spark 作业的一个小问题,它似乎不会引起任何问题 -- 但每次我看到它都会让我烦恼,并且无法想出更好的解决方案。
假设我有一个像这样的 Scala 集合:
val myStuff = List(Try(2/2), Try(2/0))
我可以使用分区将此列表分为成功和失败:
val (successes, failures) = myStuff.partition(_.isSuccess)
这很好。分区的实现只遍历一次源集合来构建两个新集合。但是,使用 Spark,我能够设计出的最佳等价物是:
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
除了解包 Try 的不同(这很好)还需要遍历数据两次。这很烦人。
是否有更好的 Spark 替代方案可以拆分 RDD 而无需多次遍历?即具有这样的签名,其中分区具有 Scala 集合分区而不是 RDD 分区的行为:
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)
作为参考,我以前使用类似下面的方法来解决这个问题。可能失败的操作是从二进制格式反序列化某些数据,并且失败变得非常有趣,需要对其进行处理并保存为 RDD 而不是记录。
def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
try {
Some(deserialize(data))
} catch {
case e: MyDesrializationError => {
logger.error(e)
None
}
}
}
可能还有其他解决方案,但这里是:
设置:
import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)
执行:
val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
(accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2),
(first, second)=> (first._1 ++ second._1,first._2 ++ second._2))
如果您需要解释,请告诉我