如何根据给定分区过滤 RDD?
How to filter RDDs based on a given partition?
考虑以下示例:
JavaPairRDD<String, Row> R = input.textFile("test").mapToPair(new PairFunction<String, String, Row>() {
public Tuple2<String, Row> call(String arg0) throws Exception {
String[] parts = arg0.split(" ");
Row r = RowFactory.create(parts[0],parts[1]);
return new Tuple2<String, Row>(r.get(0).toString(), r);
}}).partitionBy(new HashPartitioner(20));
上面的代码创建了一个名为 R
的 RDD,通过对名为 "test".
的 txt 文件的第一列进行散列将其分成 20 块
考虑 test.txt
文件的格式如下:
...
valueA1 valueB1
valueA1 valueB2
valueA1 valueB3
valueA1 valueB4
...
在我的上下文中,我有一个已知值,例如 valueA1,我想检索所有其他值。通过使用具有指定值的现有过滤器操作来完成它是微不足道的。但是,我想避免这种情况,因为基本上过滤操作将在整个 RDD 上执行。
假设 hash(valueA1)=3,我只想对分区 3 执行给定的操作。更一般地说,我对 RDD 中的 dropping/selecting 特定分区感兴趣并对其执行操作.
从 SPARK API 看来,直接是不可能的,是否有解决方法来实现同样的事情?
对于单个键,您可以使用 lookup
方法:
rdd.lookup("a")
// Seq[Int] = ArrayBuffer(1, 4)
为了高效查找,您需要一个分区的 RDD,例如使用如下 HashPartitioner
。
如果您只想过滤包含特定键的分区,可以使用 mapPartitionsWithIndex
:
import org.apache.spark.HashPartitioner
val rdd = sc.parallelize(
Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)
// A particular number is used only to get a reproducible output
)).partitionBy(new HashPartitioner(8))
val keys = Set("a", "c")
val parts = keys.map(_.## % rdd.partitions.size)
rdd.mapPartitionsWithIndex((i, iter) =>
if (parts.contains(i)) iter.filter{ case (k, _) => keys.contains(k) }
else Iterator()
).collect
// Array[(String, Int)] = Array((a,1), (a,4), (c,3))
考虑以下示例:
JavaPairRDD<String, Row> R = input.textFile("test").mapToPair(new PairFunction<String, String, Row>() {
public Tuple2<String, Row> call(String arg0) throws Exception {
String[] parts = arg0.split(" ");
Row r = RowFactory.create(parts[0],parts[1]);
return new Tuple2<String, Row>(r.get(0).toString(), r);
}}).partitionBy(new HashPartitioner(20));
上面的代码创建了一个名为 R
的 RDD,通过对名为 "test".
考虑 test.txt
文件的格式如下:
...
valueA1 valueB1
valueA1 valueB2
valueA1 valueB3
valueA1 valueB4
...
在我的上下文中,我有一个已知值,例如 valueA1,我想检索所有其他值。通过使用具有指定值的现有过滤器操作来完成它是微不足道的。但是,我想避免这种情况,因为基本上过滤操作将在整个 RDD 上执行。
假设 hash(valueA1)=3,我只想对分区 3 执行给定的操作。更一般地说,我对 RDD 中的 dropping/selecting 特定分区感兴趣并对其执行操作.
从 SPARK API 看来,直接是不可能的,是否有解决方法来实现同样的事情?
对于单个键,您可以使用 lookup
方法:
rdd.lookup("a")
// Seq[Int] = ArrayBuffer(1, 4)
为了高效查找,您需要一个分区的 RDD,例如使用如下 HashPartitioner
。
如果您只想过滤包含特定键的分区,可以使用 mapPartitionsWithIndex
:
import org.apache.spark.HashPartitioner
val rdd = sc.parallelize(
Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)
// A particular number is used only to get a reproducible output
)).partitionBy(new HashPartitioner(8))
val keys = Set("a", "c")
val parts = keys.map(_.## % rdd.partitions.size)
rdd.mapPartitionsWithIndex((i, iter) =>
if (parts.contains(i)) iter.filter{ case (k, _) => keys.contains(k) }
else Iterator()
).collect
// Array[(String, Int)] = Array((a,1), (a,4), (c,3))