Apache Spark:通过简单的操作进行 RDD 多次传递
Apache Spark: RDD multiple passes with a simple operation
我在学习 Apache Spark 框架时遇到过这个问题。
考虑以下简单的 RDD
scala> val rdd1 = sc.parallelize(List((1, Set("C3", "C2")),
(2, Set("C1", "C5", "C3")),
(3, Set("C2", "C7"))))
rdd1: RDD[(Int, Set[String])]
我想将 rdd1
中每个元素的每个集合与 "same" rdd1
中每个其他元素的集合相交;这样结果将是以下形式:
newRDD: RDD[(Int, Int, Set[String])]
// and newRDD.collect will look like:
newRDD: Array[(Int, Int, Set[String])] = Array((1, 1, Set("C3", "C2")), (1, 2, Set("C3")), (1, 3, Set("C2")),
(2, 1, Set("C3")), (2, 2, Set("C1", "C5", "C3")), (2, 3, Set()),
(3, 1, Set("C2")), (3, 2, Set()), (1, 3, Set("C2", "C7")))
我试过像这样嵌套 rdd1
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
然而,这将抛出 'Task not serilizable' 异常。
现在,如果我想在执行
之前避免 rdd1.collect()
或任何其他操作操作
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
是否可以达到预期的效果newRDD
?
你得到 'Task not serilizable' 异常的原因是因为你试图将一个 RDD
放在另一个 RDD
的映射中,在这种情况下,Spark 会尝试序列化第二个RDD
。通常这种问题你会用连接来解决:
val newRDD = rdd1.cartesian(rdd1).map { case ((a, aSet), (b, bSet)) =>
(a, b, aSet.intersect(bSet))
}
此处笛卡尔连接在新的 RDD
中创建了一对 each 集合,您可以将其相交。
我在学习 Apache Spark 框架时遇到过这个问题。 考虑以下简单的 RDD
scala> val rdd1 = sc.parallelize(List((1, Set("C3", "C2")),
(2, Set("C1", "C5", "C3")),
(3, Set("C2", "C7"))))
rdd1: RDD[(Int, Set[String])]
我想将 rdd1
中每个元素的每个集合与 "same" rdd1
中每个其他元素的集合相交;这样结果将是以下形式:
newRDD: RDD[(Int, Int, Set[String])]
// and newRDD.collect will look like:
newRDD: Array[(Int, Int, Set[String])] = Array((1, 1, Set("C3", "C2")), (1, 2, Set("C3")), (1, 3, Set("C2")),
(2, 1, Set("C3")), (2, 2, Set("C1", "C5", "C3")), (2, 3, Set()),
(3, 1, Set("C2")), (3, 2, Set()), (1, 3, Set("C2", "C7")))
我试过像这样嵌套 rdd1
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
然而,这将抛出 'Task not serilizable' 异常。
现在,如果我想在执行
rdd1.collect()
或任何其他操作操作
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
是否可以达到预期的效果newRDD
?
你得到 'Task not serilizable' 异常的原因是因为你试图将一个 RDD
放在另一个 RDD
的映射中,在这种情况下,Spark 会尝试序列化第二个RDD
。通常这种问题你会用连接来解决:
val newRDD = rdd1.cartesian(rdd1).map { case ((a, aSet), (b, bSet)) =>
(a, b, aSet.intersect(bSet))
}
此处笛卡尔连接在新的 RDD
中创建了一对 each 集合,您可以将其相交。