Spark:成对的 RDD 中的高效质量查找
Spark: Efficient mass lookup in pair RDD's
在 Apache Spark 中,我有两个 RDD。第一个 data : RDD[(K,V)]
包含键值形式的数据。第二个 pairs : RDD[(K,K)]
包含此数据的一组有趣的密钥对。
如何有效地构造一个 RDD pairsWithData : RDD[((K,K)),(V,V))]
,使其包含来自 pairs
的所有元素作为键元组及其对应的值(来自 data
)作为值元组?
数据的一些属性:
data
中的键是唯一的
pairs
中的所有条目都是唯一的
- 对于
pairs
中的所有对(k1,k2)
,保证k1 <= k2
- 'pairs'的大小只是数据大小的常数
|pairs| = O(|data|)
- 当前数据大小(预计会增长):
|data| ~ 10^8, |pairs| ~ 10^10
当前尝试
以下是 Scala 中的一些示例代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
// This kind of show the idea, but fails at runtime.
def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
keyPairs map {case (k1,k2) =>
val v1 : String = data lookup k1 head;
val v2 : String = data lookup k2 head;
((k1, k2), (v1,v2))
}
}
// Works but is O(|data|^2)
def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
// Construct all possible pairs of values
val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))}
// Select only the values who's keys are in keyPairs
keyPairs map {(_,0)} join cartesianData mapValues {_._2}
}
// Example function that find pairs of keys
// Runs in O(|data|) in real life, but cannot maintain the values
def relevantPairs(data : RDD[(Int, String)]) = {
val keys = data map (_._1)
keys cartesian keys filter {case (x,y) => x*y == 12 && x < y}
}
// Example run
val data = sc parallelize(1 to 12) map (x => (x, "Number " + x))
val pairs = relevantPairs(data)
val pairsWithData = massPairLookup2(pairs, data)
// Print:
// ((1,12),(Number1,Number12))
// ((2,6),(Number2,Number6))
// ((3,4),(Number3,Number4))
pairsWithData.foreach(println)
尝试 1
首先,我尝试仅在 data
上使用 lookup
函数,但在执行时会抛出运行时错误。似乎 self
在 PairRDDFunctions
特征中为空。
此外,我不确定 lookup
的性能。 The documentation 说 如果 RDD 有一个已知的分区器,只搜索键映射到的分区,这个操作就可以有效地完成。 这听起来像 n
查找需要 O (n*|partition|) 时间最多,我怀疑可以优化。
尝试 2
此尝试有效,但我创建了 |data|^2
对,这会降低性能。我不希望 Spark 能够优化它。
您的查找 1 不起作用,因为您无法在 worker 内部(在另一个转换内部)执行 RDD 转换。
在查找2中,我认为没有必要进行全笛卡尔...
你可以这样做:
val firstjoin = pairs.map({case (k1,k2) => (k1, (k1,k2))})
.join(data)
.map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)})
val result = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))})
.join(data)
.map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))})
或更密集的形式:
val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2)
val result = firstjoin.map({case (x,y) => (x._2, (x,y))})
.join(data).map({case(x, (y, z))=>(y._1, (y._2, z))})
我不认为你可以更有效地做到这一点,但我可能是错的...
在 Apache Spark 中,我有两个 RDD。第一个 data : RDD[(K,V)]
包含键值形式的数据。第二个 pairs : RDD[(K,K)]
包含此数据的一组有趣的密钥对。
如何有效地构造一个 RDD pairsWithData : RDD[((K,K)),(V,V))]
,使其包含来自 pairs
的所有元素作为键元组及其对应的值(来自 data
)作为值元组?
数据的一些属性:
data
中的键是唯一的pairs
中的所有条目都是唯一的- 对于
pairs
中的所有对(k1,k2)
,保证k1 <= k2
- 'pairs'的大小只是数据大小的常数
|pairs| = O(|data|)
- 当前数据大小(预计会增长):
|data| ~ 10^8, |pairs| ~ 10^10
当前尝试
以下是 Scala 中的一些示例代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
// This kind of show the idea, but fails at runtime.
def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
keyPairs map {case (k1,k2) =>
val v1 : String = data lookup k1 head;
val v2 : String = data lookup k2 head;
((k1, k2), (v1,v2))
}
}
// Works but is O(|data|^2)
def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
// Construct all possible pairs of values
val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))}
// Select only the values who's keys are in keyPairs
keyPairs map {(_,0)} join cartesianData mapValues {_._2}
}
// Example function that find pairs of keys
// Runs in O(|data|) in real life, but cannot maintain the values
def relevantPairs(data : RDD[(Int, String)]) = {
val keys = data map (_._1)
keys cartesian keys filter {case (x,y) => x*y == 12 && x < y}
}
// Example run
val data = sc parallelize(1 to 12) map (x => (x, "Number " + x))
val pairs = relevantPairs(data)
val pairsWithData = massPairLookup2(pairs, data)
// Print:
// ((1,12),(Number1,Number12))
// ((2,6),(Number2,Number6))
// ((3,4),(Number3,Number4))
pairsWithData.foreach(println)
尝试 1
首先,我尝试仅在 data
上使用 lookup
函数,但在执行时会抛出运行时错误。似乎 self
在 PairRDDFunctions
特征中为空。
此外,我不确定 lookup
的性能。 The documentation 说 如果 RDD 有一个已知的分区器,只搜索键映射到的分区,这个操作就可以有效地完成。 这听起来像 n
查找需要 O (n*|partition|) 时间最多,我怀疑可以优化。
尝试 2
此尝试有效,但我创建了 |data|^2
对,这会降低性能。我不希望 Spark 能够优化它。
您的查找 1 不起作用,因为您无法在 worker 内部(在另一个转换内部)执行 RDD 转换。
在查找2中,我认为没有必要进行全笛卡尔...
你可以这样做:
val firstjoin = pairs.map({case (k1,k2) => (k1, (k1,k2))})
.join(data)
.map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)})
val result = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))})
.join(data)
.map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))})
或更密集的形式:
val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2)
val result = firstjoin.map({case (x,y) => (x._2, (x,y))})
.join(data).map({case(x, (y, z))=>(y._1, (y._2, z))})
我不认为你可以更有效地做到这一点,但我可能是错的...