如何使用 Spark RDD 中的唯一值填充 Scala Seq of Sets?

How to fill Scala Seq of Sets with unique values from Spark RDD?

我正在使用 Spark 和 Scala。我有一个 Array[String] 的 RDD,我将对其进行迭代。 RDD 包含 (name, age, work, ...) 等属性的值。我正在使用一系列可变字符串集(称为 attributes)来收集每个属性的所有唯一值。

将 RDD 想象成这样:

("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")

最后我想要这样的东西:

attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))

我有以下代码:

val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))

lines.foreach(line => {
  for {(value, index) <- line.zipWithIndex} {
    attributes(index).add(value)
    // #1
  }
})

// #2

当我调试并停在标有 #1 的行时,一切正常,attributes 正确地填充了唯一值。

但是在循环之后,在第#2行,属性再次为空。查看它表明,属性是一系列集合,它们的大小都是 0.

Seq()
Seq()
...

我做错了什么?是否存在某种我不知道的范围界定?

答案在于Spark是一个分布式引擎。我会给你一个你所面临的问题的粗略概念。这里每个 RDD 中的元素被分装到 Partitions 中,每个 Partition 可能存在于不同的节点上。

当你写 rdd1.foreach(f) 时,f 被包裹在一个闭包中(它获取相应对象的副本)。现在,这个闭包被序列化,然后被发送到每个节点,它被应用到 Partition.

中的每个元素。

在这里,你的 f 将在其包装的闭包中获得 attributescopy,因此当 f 被执行时,它与 [= 的副本交互19=] 而不是你想要的 attributes 。这导致您的 attributes 没有任何更改就被排除在外。

我希望现在问题已经清楚了。

val yourRdd = sc.parallelize(List(
    ("name1","21","JobA"),
    ("name2","21","JobB"),
    ("name3","22","JobA")
))

val yourNeededRdd = yourRdd
  .flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
  .groupBy({ case (attrName, attrVal) => attrName })
  .map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })

// RDD(
//     ("name", List("name1", "name2", "name3")),
//     ("age", List("21", "22")),
//     ("work", List("JobA", "JobB"))
// )

// Or

val distinctNamesRdd = yourRdd.map(_._1).distinct
// RDD("name1", "name2", "name3")

val distinctAgesRdd = yourRdd.map(_._2).distinct
// RDD("21", "22")

val distinctWorksRdd = yourRdd.map(_._3).distinct
// RDD("JobA", "JobB")