Spark:列表的交集不起作用
Spark: Intersection of Lists not working
我有一个如下形式的 RDD:
t1-> (Long, List[Long])
和表单列表
t2-> List[Long]
我需要执行列表的并集和交集。我正在尝试以下代码:
val t1 = a.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
val t2 = b.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _).map(x => x._2).collect()
val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2))))
val t3union = t1.map(x => (x._1, (x._2.union(t2))))
虽然并集 return 的结果是正确的,但交集始终是一个空列表。我无法确定问题所在。请帮忙!谢谢!
这是一个例子:
(1, List(1596, 1617, 1929, 2399, 2674))
(2, List(1702, 1785, 1933, 2054, 2583, 2913))
(3, List(1982, 2002, 2048, 2341, 2666))
和
List(2002, 2399)
这应该return交集:
(1, List(2399))
(2, List())
(3, List(2002))
联合:
(1, List(1596, 1617, 1929, 2399, 2674, 2002))
(2, List(1702, 1785, 1933, 2054, 2583, 2913, 2002, 2399))
(3, List(1982, 2002, 2048, 2341, 2666, 2399))
我觉得你的交集代码没问题。它应该工作。也尝试这样做以获得更清晰和可能的性能:
val t3intersect = t1.mapValues( _ intersect t2 )
编辑:我不知道什么是a
和b
,获取t1
和[=的逻辑是什么16=] 来自他们,但是如果你在 Spark REPL 中如下初始化 t1
和 t2
,为了测试:
scala> val t1 = sc.parallelize( List(
| (1, List(1596, 1617, 1929, 2399, 2674)),
| (2, List(1702, 1785, 1933, 2054, 2583, 2913)),
| (3, List(1982, 2002, 2048, 2341, 2666)) ), 2)
t1: org.apache.spark.rdd.RDD[(Int, List[Int])] = ParallelCollectionRDD[10] at parallelize at <console>:12
scala> val t2 = List(2002, 2399)
t2: List[Int] = List(2002, 2399)
那么你得到了预期的结果:
scala> val tr = t1.mapValues( _ intersect t2 )
tr: org.apache.spark.rdd.RDD[(Int, List[Int])] = MappedValuesRDD[12] at mapValues at <console>:16
scala> tr.collect()
res13: Array[(Int, List[Int])] = Array((1,List(2399)), (2,List()), (3,List(2002)))
所以,请注意其他地方是否有错误。
我已将您的问题案例重现如下:
object ItersectionList {
def main(args: Array[String]) {
val spConf = new SparkConf().setMaster("local[2]").setAppName("ItersectionList")
val sc = new SparkContext(spConf)
val a = Array(
(1, List(1596, 1617, 1929, 2399, 2674)),
(2, List(1702, 1785, 1933, 2054, 2583, 2913)),
(3, List(1982, 2002, 2048, 2341, 2666))
)
val t2 = List(2002, 2399)
val t1 = sc.makeRDD(a).map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2))))
val t3union = t1.map(x => (x._1, (x._2.union(t2))))
t3intersect.foreach(println)
t3union.foreach(println)
}
}
结果如下:
Intersection:
(2,List())
(1,List())
(3,List())
Union:
(2,List(List(1702, 1785, 1933, 2054, 2583, 2913), 2002, 2399))
(1,List(List(1596, 1617, 1929, 2399, 2674), 2002, 2399))
(3,List(List(1982, 2002, 2048, 2341, 2666), 2002, 2399))
我发现是map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
里List(x._2)
的问题,把List(a, b, c)
改成了List(List(a, b, c))
。由于 List(List(a, b, c))
不匹配 List(a, b, c)
,交集将为空。您可以按如下方式删除List()
,结果将是正确的。
val t1 = sc.makeRDD(a).map(x => (x._1, x._2)).reduceByKey(_ ++ _)
或
val t1 = sc.makeRDD(a).reduceByKey(_ ++ _)
我有一个如下形式的 RDD:
t1-> (Long, List[Long])
和表单列表
t2-> List[Long]
我需要执行列表的并集和交集。我正在尝试以下代码:
val t1 = a.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
val t2 = b.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _).map(x => x._2).collect()
val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2))))
val t3union = t1.map(x => (x._1, (x._2.union(t2))))
虽然并集 return 的结果是正确的,但交集始终是一个空列表。我无法确定问题所在。请帮忙!谢谢!
这是一个例子:
(1, List(1596, 1617, 1929, 2399, 2674))
(2, List(1702, 1785, 1933, 2054, 2583, 2913))
(3, List(1982, 2002, 2048, 2341, 2666))
和
List(2002, 2399)
这应该return交集:
(1, List(2399))
(2, List())
(3, List(2002))
联合:
(1, List(1596, 1617, 1929, 2399, 2674, 2002))
(2, List(1702, 1785, 1933, 2054, 2583, 2913, 2002, 2399))
(3, List(1982, 2002, 2048, 2341, 2666, 2399))
我觉得你的交集代码没问题。它应该工作。也尝试这样做以获得更清晰和可能的性能:
val t3intersect = t1.mapValues( _ intersect t2 )
编辑:我不知道什么是a
和b
,获取t1
和[=的逻辑是什么16=] 来自他们,但是如果你在 Spark REPL 中如下初始化 t1
和 t2
,为了测试:
scala> val t1 = sc.parallelize( List(
| (1, List(1596, 1617, 1929, 2399, 2674)),
| (2, List(1702, 1785, 1933, 2054, 2583, 2913)),
| (3, List(1982, 2002, 2048, 2341, 2666)) ), 2)
t1: org.apache.spark.rdd.RDD[(Int, List[Int])] = ParallelCollectionRDD[10] at parallelize at <console>:12
scala> val t2 = List(2002, 2399)
t2: List[Int] = List(2002, 2399)
那么你得到了预期的结果:
scala> val tr = t1.mapValues( _ intersect t2 )
tr: org.apache.spark.rdd.RDD[(Int, List[Int])] = MappedValuesRDD[12] at mapValues at <console>:16
scala> tr.collect()
res13: Array[(Int, List[Int])] = Array((1,List(2399)), (2,List()), (3,List(2002)))
所以,请注意其他地方是否有错误。
我已将您的问题案例重现如下:
object ItersectionList {
def main(args: Array[String]) {
val spConf = new SparkConf().setMaster("local[2]").setAppName("ItersectionList")
val sc = new SparkContext(spConf)
val a = Array(
(1, List(1596, 1617, 1929, 2399, 2674)),
(2, List(1702, 1785, 1933, 2054, 2583, 2913)),
(3, List(1982, 2002, 2048, 2341, 2666))
)
val t2 = List(2002, 2399)
val t1 = sc.makeRDD(a).map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2))))
val t3union = t1.map(x => (x._1, (x._2.union(t2))))
t3intersect.foreach(println)
t3union.foreach(println)
}
}
结果如下:
Intersection:
(2,List())
(1,List())
(3,List())
Union:
(2,List(List(1702, 1785, 1933, 2054, 2583, 2913), 2002, 2399))
(1,List(List(1596, 1617, 1929, 2399, 2674), 2002, 2399))
(3,List(List(1982, 2002, 2048, 2341, 2666), 2002, 2399))
我发现是map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _)
里List(x._2)
的问题,把List(a, b, c)
改成了List(List(a, b, c))
。由于 List(List(a, b, c))
不匹配 List(a, b, c)
,交集将为空。您可以按如下方式删除List()
,结果将是正确的。
val t1 = sc.makeRDD(a).map(x => (x._1, x._2)).reduceByKey(_ ++ _)
或
val t1 = sc.makeRDD(a).reduceByKey(_ ++ _)