如何使用 Spark RDD 中的唯一值填充 Scala Seq of Sets?
How to fill Scala Seq of Sets with unique values from Spark RDD?
我正在使用 Spark 和 Scala。我有一个 Array[String]
的 RDD,我将对其进行迭代。 RDD 包含 (name, age, work, ...)
等属性的值。我正在使用一系列可变字符串集(称为 attributes
)来收集每个属性的所有唯一值。
将 RDD 想象成这样:
("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")
最后我想要这样的东西:
attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))
我有以下代码:
val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))
lines.foreach(line => {
for {(value, index) <- line.zipWithIndex} {
attributes(index).add(value)
// #1
}
})
// #2
当我调试并停在标有 #1 的行时,一切正常,attributes
正确地填充了唯一值。
但是在循环之后,在第#2行,属性再次为空。查看它表明,属性是一系列集合,它们的大小都是 0.
Seq()
Seq()
...
我做错了什么?是否存在某种我不知道的范围界定?
答案在于Spark是一个分布式引擎。我会给你一个你所面临的问题的粗略概念。这里每个 RDD
中的元素被分装到 Partitions
中,每个 Partition
可能存在于不同的节点上。
当你写 rdd1.foreach(f)
时,f
被包裹在一个闭包中(它获取相应对象的副本)。现在,这个闭包被序列化,然后被发送到每个节点,它被应用到 Partition
.
中的每个元素。
在这里,你的 f
将在其包装的闭包中获得 attributes
的 copy
,因此当 f
被执行时,它与 [= 的副本交互19=] 而不是你想要的 attributes
。这导致您的 attributes
没有任何更改就被排除在外。
我希望现在问题已经清楚了。
val yourRdd = sc.parallelize(List(
("name1","21","JobA"),
("name2","21","JobB"),
("name3","22","JobA")
))
val yourNeededRdd = yourRdd
.flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
.groupBy({ case (attrName, attrVal) => attrName })
.map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })
// RDD(
// ("name", List("name1", "name2", "name3")),
// ("age", List("21", "22")),
// ("work", List("JobA", "JobB"))
// )
// Or
val distinctNamesRdd = yourRdd.map(_._1).distinct
// RDD("name1", "name2", "name3")
val distinctAgesRdd = yourRdd.map(_._2).distinct
// RDD("21", "22")
val distinctWorksRdd = yourRdd.map(_._3).distinct
// RDD("JobA", "JobB")
我正在使用 Spark 和 Scala。我有一个 Array[String]
的 RDD,我将对其进行迭代。 RDD 包含 (name, age, work, ...)
等属性的值。我正在使用一系列可变字符串集(称为 attributes
)来收集每个属性的所有唯一值。
将 RDD 想象成这样:
("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")
最后我想要这样的东西:
attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))
我有以下代码:
val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))
lines.foreach(line => {
for {(value, index) <- line.zipWithIndex} {
attributes(index).add(value)
// #1
}
})
// #2
当我调试并停在标有 #1 的行时,一切正常,attributes
正确地填充了唯一值。
但是在循环之后,在第#2行,属性再次为空。查看它表明,属性是一系列集合,它们的大小都是 0.
Seq()
Seq()
...
我做错了什么?是否存在某种我不知道的范围界定?
答案在于Spark是一个分布式引擎。我会给你一个你所面临的问题的粗略概念。这里每个 RDD
中的元素被分装到 Partitions
中,每个 Partition
可能存在于不同的节点上。
当你写 rdd1.foreach(f)
时,f
被包裹在一个闭包中(它获取相应对象的副本)。现在,这个闭包被序列化,然后被发送到每个节点,它被应用到 Partition
.
在这里,你的 f
将在其包装的闭包中获得 attributes
的 copy
,因此当 f
被执行时,它与 [= 的副本交互19=] 而不是你想要的 attributes
。这导致您的 attributes
没有任何更改就被排除在外。
我希望现在问题已经清楚了。
val yourRdd = sc.parallelize(List(
("name1","21","JobA"),
("name2","21","JobB"),
("name3","22","JobA")
))
val yourNeededRdd = yourRdd
.flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
.groupBy({ case (attrName, attrVal) => attrName })
.map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })
// RDD(
// ("name", List("name1", "name2", "name3")),
// ("age", List("21", "22")),
// ("work", List("JobA", "JobB"))
// )
// Or
val distinctNamesRdd = yourRdd.map(_._1).distinct
// RDD("name1", "name2", "name3")
val distinctAgesRdd = yourRdd.map(_._2).distinct
// RDD("21", "22")
val distinctWorksRdd = yourRdd.map(_._3).distinct
// RDD("JobA", "JobB")