Spark:将键元组对加入键列表值
Spark: join key-tuple pairs into key-list value
我有很多这样的 RDD(比如 4 个):K,(v1,v2,..,vN) 我必须加入它们,所以我只是 运行
r1.join(r2).join(r3).join(r4)
结果将类似于 K,((v1,v2,..,vN),(v1,v2,...,vN)),(v1,v2,...,vN)) ... 等等。基本上,我会得到一个元组的嵌套结构,每个连接操作一个。
我想知道是否有一种方法可以告诉 Spark 输出每个 RDD 值的并集作为连接的结果。换句话说,我想得到类似的东西:
K, [ v1,v2,..., vN,v1,v2,..., vN,v1,v2,..., v1,v2,...,vN ]
您可以进行多重连接,或者您可以避免使用嵌套语法,而是应用一个 cogroup 版本。但是,由于 cogroup()
只允许您对最多 4 个 RDD 进行分组,因此您可以对它进行猴子修补以进行更多分组。下面是一个 multiCogroup()
函数的例子:
def multiCogroup[K : ClassTag, V : ClassTag](numPartitions: Int, inputRDDs: RDD[(K, V)]*) : RDD[(K, Seq[V])] = {
val cg = new CoGroupedRDD[K](inputRDDs.toSeq, new HashPartitioner(numPartitions))
cg.mapValues { case iterables => iterables.foldLeft(Seq[V]())(_ ++ _.asInstanceOf[Iterable[V]].toSeq) }
}
运行举个例子,可以看到如下:
import org.apache.spark.rdd._
import org.apache.spark.HashPartitioner
import scala.reflect.ClassTag
val rdd1 = sc.parallelize(Seq(("a", 1),("b", 2),("c", 3),("d", 4)))
val rdd2 = sc.parallelize(Seq(("a", 4),("b", 3),("c", 2),("d", 1)))
val rdd3 = sc.parallelize(Seq(("c", 0),("d", 0),("e", 0)))
val rdd4 = sc.parallelize(Seq(("a", 5),("b", 5),("e", 5)))
val rdd5 = sc.parallelize(Seq(("b", -1),("c", -1),("d", -1)))
val combined = multiCogroup[String, Int](2, rdd1, rdd2, rdd3, rdd4, rdd5)
combined.foreach(println)
// (d,List(4, 1, 0, -1))
// (b,List(2, 3, 5, -1))
// (e,List(0, 5))
// (a,List(1, 4, 5))
// (c,List(3, 2, 0, -1))
注意事项:
- 如果您的输入 RDD 值类型不统一,您可以将输出类型
V
归纳为超级类型(例如 Int
和 Long
为 Integral
, String
和 Int
到 Any
)。这可能不值得推荐,因为它可能会在您的程序中引起一些歧义问题。一般来说,我认为最好的用例是当所有输入值类型都相同时。
- 我定义了使用
HashPartitioner
的函数,分区数作为参数 numPartitions
。通过替换 numPartitions
参数来隧道化您自己的 Partitioner 可能是有意义的。然后,您可以将输入分区器直接传递给 CoGroupedRDD[K]()
,与 cogroup 的实现类似。
- 在大型 RDD 上使用此方法时,我可能会谨慎行事。根据输入数据的大小以及键集的分布,连接本身可能有点棘手。将其扩展到将多个 RDD 分组到一个 cogroup 中可能会更快地导致类似的内存问题。
我有很多这样的 RDD(比如 4 个):K,(v1,v2,..,vN) 我必须加入它们,所以我只是 运行
r1.join(r2).join(r3).join(r4)
结果将类似于 K,((v1,v2,..,vN),(v1,v2,...,vN)),(v1,v2,...,vN)) ... 等等。基本上,我会得到一个元组的嵌套结构,每个连接操作一个。
我想知道是否有一种方法可以告诉 Spark 输出每个 RDD 值的并集作为连接的结果。换句话说,我想得到类似的东西:
K, [ v1,v2,..., vN,v1,v2,..., vN,v1,v2,..., v1,v2,...,vN ]
您可以进行多重连接,或者您可以避免使用嵌套语法,而是应用一个 cogroup 版本。但是,由于 cogroup()
只允许您对最多 4 个 RDD 进行分组,因此您可以对它进行猴子修补以进行更多分组。下面是一个 multiCogroup()
函数的例子:
def multiCogroup[K : ClassTag, V : ClassTag](numPartitions: Int, inputRDDs: RDD[(K, V)]*) : RDD[(K, Seq[V])] = {
val cg = new CoGroupedRDD[K](inputRDDs.toSeq, new HashPartitioner(numPartitions))
cg.mapValues { case iterables => iterables.foldLeft(Seq[V]())(_ ++ _.asInstanceOf[Iterable[V]].toSeq) }
}
运行举个例子,可以看到如下:
import org.apache.spark.rdd._
import org.apache.spark.HashPartitioner
import scala.reflect.ClassTag
val rdd1 = sc.parallelize(Seq(("a", 1),("b", 2),("c", 3),("d", 4)))
val rdd2 = sc.parallelize(Seq(("a", 4),("b", 3),("c", 2),("d", 1)))
val rdd3 = sc.parallelize(Seq(("c", 0),("d", 0),("e", 0)))
val rdd4 = sc.parallelize(Seq(("a", 5),("b", 5),("e", 5)))
val rdd5 = sc.parallelize(Seq(("b", -1),("c", -1),("d", -1)))
val combined = multiCogroup[String, Int](2, rdd1, rdd2, rdd3, rdd4, rdd5)
combined.foreach(println)
// (d,List(4, 1, 0, -1))
// (b,List(2, 3, 5, -1))
// (e,List(0, 5))
// (a,List(1, 4, 5))
// (c,List(3, 2, 0, -1))
注意事项:
- 如果您的输入 RDD 值类型不统一,您可以将输出类型
V
归纳为超级类型(例如Int
和Long
为Integral
,String
和Int
到Any
)。这可能不值得推荐,因为它可能会在您的程序中引起一些歧义问题。一般来说,我认为最好的用例是当所有输入值类型都相同时。 - 我定义了使用
HashPartitioner
的函数,分区数作为参数numPartitions
。通过替换numPartitions
参数来隧道化您自己的 Partitioner 可能是有意义的。然后,您可以将输入分区器直接传递给CoGroupedRDD[K]()
,与 cogroup 的实现类似。 - 在大型 RDD 上使用此方法时,我可能会谨慎行事。根据输入数据的大小以及键集的分布,连接本身可能有点棘手。将其扩展到将多个 RDD 分组到一个 cogroup 中可能会更快地导致类似的内存问题。