将 Spark RDD 减少到 return 多个值

Reduce Spark RDD to return multiple values

我有以下 RDD,其中包含我想按项目相似性分组的项目集(同一集中的项目被认为是相似的。相似性是可传递的,集合中至少有一个共同项目的所有项目也是认为相似)

输入RDD:

Set(w1, w2)
Set(w1, w2, w3, w4)
Set(w5, w2, w6)
Set(w7, w8, w9)
Set(w10, w5, w8) --> All the first 5 set elements are similar as each of the sets have atleast one common item
Set(w11, w12, w13)

我希望将上面的RDD缩减为

Set(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10)
Set(w11, w12, w13)

关于我如何做到这一点有什么建议吗?我无法执行类似下面的操作,如果它们不包含任何公共元素,我可以忽略减少两个集合:

data.reduce((a,b) => if (a.intersect(b).size > 0) a ++ b ***else (a,b)***)

谢谢。

您的 reduce 算法实际上是不正确的。例如,如果一个集合不能与下一个集合合并,但仍可以与收集中的另一个集合合并怎么办。

可能有更好的方法,但我想出了一个解决方案,将其转换为图形问题并使用 Graphx。

val data = Array(Set("w1", "w2", "w3"), Set("w5", "w6"), Set("w7"), Set("w2", "w3", "w4"))
val setRdd = sc.parallelize(data).cache

// Generate an unique id for each item to use as vertex's id in the graph
val itemToId = setRdd.flatMap(_.toSeq).distinct.zipWithUniqueId.cache
val idToItem = itemToId.map { case (item, itemId) => (itemId, item) }

// Convert to a RDD of set of itemId
val newSetRdd = setRdd.zipWithUniqueId
  .flatMap { case (sets, setId) =>
    sets.map { item => (item, setId) }
  }.join(itemToId).values.groupByKey().values

// Create an RDD containing edges of the graph
val edgeRdd = newSetRdd.flatMap { set =>
    val seq = set.toSeq
    val head = seq.head
    // Add an edge from the first item to each item in a set, 
    // including itself
    seq.map { item => Edge[Long](head, item)}
  }

val graph = Graph.fromEdges(edgeRdd, Nil)

// Run connected component algorithm to check which items are similar.
// Items in the same component are similar
val verticesRDD = graph.connectedComponents().vertices

verticesRDD.join(idToItem).values.groupByKey.values.collect.foreach(println)