Apache Spark RDD 过滤成两个 RDD
Apache Spark RDD filter into two RDDs
我需要将一个 RDD 分成两部分:
1个满足条件的部分;另一部分没有。我可以在原始 RDD 上执行 filter
两次,但效率似乎很低。有没有办法可以做我想要的?我在 API 和文献中都找不到任何内容。
如果您可以 T
而不是 RDD[T]
,那么您可以 。否则,您可以这样做:
val data = sc.parallelize(1 to 100)
val splitData = data.mapPartitions{iter => {
val splitList = (iter.toList).partition(_%2 == 0)
Tuple1(splitList).productIterator
}
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])
然后,当您执行某项操作时,您可能需要减少它以合并列表
Spark 默认不支持此功能。如果您事先缓存相同的数据,则对相同的数据进行两次过滤并没有那么糟糕,而且过滤本身很快。
如果真的只是两种不同的类型,你可以使用辅助方法:
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
(passes, fails)
}
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
但是一旦你有多种类型的数据,只需将过滤后的数据分配给一个新的值。
Spark RDD 没有这样的api。
这是一个基于 pull request for rdd.span 的版本,应该可以工作:
import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
val splits = rdd.mapPartitions { iter =>
val (left, right) = iter.partition(p)
val iterSeq = Seq(left, right)
iterSeq.iterator
}
val left = splits.mapPartitions { iter => iter.next().toIterator}
val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
}
(left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0 )
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
重点是,你要做的不是滤镜,而是地图。
(T) -> (Boolean, T)
抱歉,我的 Scala 语法效率低下。但想法是,您通过将答案集映射到 Key/Value 对来拆分答案集。 Key 可以是一个布尔值,指示它是否正在传递 'Filter' 谓词。
您可以通过分区处理来控制输出到不同的目标。只要确保您不将并行处理限制为仅下游的两个分区即可。
另见
你可以使用subtract function
(如果过滤操作太昂贵)。
PySpark 代码:
rdd1 = data.filter(filterFunction)
rdd2 = data.subtract(rdd1)
我需要将一个 RDD 分成两部分:
1个满足条件的部分;另一部分没有。我可以在原始 RDD 上执行 filter
两次,但效率似乎很低。有没有办法可以做我想要的?我在 API 和文献中都找不到任何内容。
如果您可以 T
而不是 RDD[T]
,那么您可以
val data = sc.parallelize(1 to 100)
val splitData = data.mapPartitions{iter => {
val splitList = (iter.toList).partition(_%2 == 0)
Tuple1(splitList).productIterator
}
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])
然后,当您执行某项操作时,您可能需要减少它以合并列表
Spark 默认不支持此功能。如果您事先缓存相同的数据,则对相同的数据进行两次过滤并没有那么糟糕,而且过滤本身很快。
如果真的只是两种不同的类型,你可以使用辅助方法:
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
(passes, fails)
}
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
但是一旦你有多种类型的数据,只需将过滤后的数据分配给一个新的值。
Spark RDD 没有这样的api。
这是一个基于 pull request for rdd.span 的版本,应该可以工作:
import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
val splits = rdd.mapPartitions { iter =>
val (left, right) = iter.partition(p)
val iterSeq = Seq(left, right)
iterSeq.iterator
}
val left = splits.mapPartitions { iter => iter.next().toIterator}
val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
}
(left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0 )
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
重点是,你要做的不是滤镜,而是地图。
(T) -> (Boolean, T)
抱歉,我的 Scala 语法效率低下。但想法是,您通过将答案集映射到 Key/Value 对来拆分答案集。 Key 可以是一个布尔值,指示它是否正在传递 'Filter' 谓词。
您可以通过分区处理来控制输出到不同的目标。只要确保您不将并行处理限制为仅下游的两个分区即可。
另见
你可以使用subtract function
(如果过滤操作太昂贵)。
PySpark 代码:
rdd1 = data.filter(filterFunction)
rdd2 = data.subtract(rdd1)