Apache Spark RDD 替换

Apache Spark RDD substitution

我正在尝试解决一个问题,因此我有一个这样的数据集:

(1, 3)
(1, 4)
(1, 7)
(1, 2)   <-
(2, 7)   <-
(6, 6)    
(3, 7)   <-
(7, 4)   <-
...

由于 (1 -> 2)(2 -> 7),我想将集合 (2, 7) 替换为 (1, 7) 同样,(3 -> 7)(7 -> 4) 也将 (7,4) 替换为 (3, 4)

因此,我的数据集变成了

(1, 3)
(1, 4)
(1, 7)
(1, 2)  
(1, 7)  
(6, 6)    
(3, 7)
(3, 4)
...

知道如何解决这个问题吗?

谢谢

这个问题看起来像图的传递闭包,以分布式边列表的形式表示。

与旧的 Hadoop MR 相比,Spark 的一个关键特性是 Spark 支持交互式算法。为了解决这样的图遍历问题,我们在递归函数中利用了该功能:

def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
  val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
  if (transitiveValues.isEmpty) {
    rdd
  } else {
    val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
    val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
    closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
  }
}

这并没有完全产生上面预期的输出,因为没有优先级的概念(隐式排序),所以 closure((1, 2),(2, 7)) = (1,7) 而不是上面预期的 (1, 2), (1, 7)。可以以额外的复杂性为代价添加排序。此外,它不支持循环图(带循环)。

此算法应仅作为根据特定内部要求进行调整的起点。