如何通过两个RDD按键或filter()使用spark intersection()?
how to use spark intersection() by key or filter() with two RDD?
我想在 spark 中按键使用 intersection()
或 filter()
。
但是我真的不知道怎么按键使用intersection()
所以我尝试使用filter()
,但是没有用。
示例 - 这是两个 RDD:
data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))
val data3 = data2.map{_._1}
data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
我想根据 data2
拥有的键得到一个与 data1
具有相同键的(键,值)对。
Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))
就是我要的结果
有没有按键intersection()
或者filter()
解决这个问题的方法?
This can be achieved in different ways
1。 broadcast
filter()
中的变量 - 需要改进可扩展性
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
// broadcast data2 key list to use in filter method, which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(r._1))
println(result.collect().toList)
//Output
List((a,1), (a,2), (b,2), (b,3))
2。 cogroup
(类似于按键分组)
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5))),
(c, (CompactBuffer(1), CompactBuffer()))
) */
//Now filter keys which have two non empty CompactBuffer. You can do that with
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also.
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty }
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5)))
) */
//As we care about first data only, lets pick first compact buffer only
// by doing v1.map(val1 => (k, val1))
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) }
//List((a, 1), (a, 2), (b, 2), (b, 3))
3。使用内连接
val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct()
//List((b,2), (b,3), (a,2), (a,1))
此处 data1.join(data2)
保存具有公共键的对 (inner join)
//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1)))
对于你的问题,我认为cogroup()
更适合。 intersection()
方法将同时考虑数据中的键和值,并将导致空的 rdd
.
函数 cogroup()
将 rdd
的值按键分组并给出 (key, vals1, vals2)
,其中 vals1
和 vals2
包含值每个键分别为 data1
和 data2
。请注意,如果某个键在两个数据集中不共享,vals1
或 vals2
之一将作为空 Seq
返回,因此我们首先必须过滤掉这些元组以到达两个 rdd
的 交叉点 。
接下来,我们将获取 vals1
- 其中包含来自 data1
的 值 用于常见的 键 - 并将其转换为格式 (key, Array)
。最后我们使用flatMapValues()
将结果解包为(key, value)
的格式。
val result = (data1.cogroup(data2)
.filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
.map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
.flatMapValues(identity[Array[Int]]))
result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))
我想在 spark 中按键使用 intersection()
或 filter()
。
但是我真的不知道怎么按键使用intersection()
所以我尝试使用filter()
,但是没有用。
示例 - 这是两个 RDD:
data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))
val data3 = data2.map{_._1}
data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
我想根据 data2
拥有的键得到一个与 data1
具有相同键的(键,值)对。
Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))
就是我要的结果
有没有按键intersection()
或者filter()
解决这个问题的方法?
This can be achieved in different ways
1。 broadcast
filter()
中的变量 - 需要改进可扩展性
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
// broadcast data2 key list to use in filter method, which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(r._1))
println(result.collect().toList)
//Output
List((a,1), (a,2), (b,2), (b,3))
2。 cogroup
(类似于按键分组)
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5))),
(c, (CompactBuffer(1), CompactBuffer()))
) */
//Now filter keys which have two non empty CompactBuffer. You can do that with
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also.
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty }
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5)))
) */
//As we care about first data only, lets pick first compact buffer only
// by doing v1.map(val1 => (k, val1))
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) }
//List((a, 1), (a, 2), (b, 2), (b, 3))
3。使用内连接
val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct()
//List((b,2), (b,3), (a,2), (a,1))
此处 data1.join(data2)
保存具有公共键的对 (inner join)
//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1)))
对于你的问题,我认为cogroup()
更适合。 intersection()
方法将同时考虑数据中的键和值,并将导致空的 rdd
.
函数 cogroup()
将 rdd
的值按键分组并给出 (key, vals1, vals2)
,其中 vals1
和 vals2
包含值每个键分别为 data1
和 data2
。请注意,如果某个键在两个数据集中不共享,vals1
或 vals2
之一将作为空 Seq
返回,因此我们首先必须过滤掉这些元组以到达两个 rdd
的 交叉点 。
接下来,我们将获取 vals1
- 其中包含来自 data1
的 值 用于常见的 键 - 并将其转换为格式 (key, Array)
。最后我们使用flatMapValues()
将结果解包为(key, value)
的格式。
val result = (data1.cogroup(data2)
.filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
.map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
.flatMapValues(identity[Array[Int]]))
result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))