扁平化 RDD 的键

Flattening the key of a RDD

我有一个 (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double]) 类型的 Spark RDD。我希望将它的键展平以将其转换为 breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double]) 类型的 RDD。我目前正在做:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))

anonymousOrdering() 的签名是String => (Array[DenseVector[Double]], DenseVector[Double])

它returns type mismatch: required: TraversableOnce[?]。 Python 做同样事情的代码是:

newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])

如何在 Scala 中做同样的事情?我一般用flatMapValues但是这里需要把key压平

更改代码以使用 Map 而不是 FlatMap:

val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()

如果 anonymousOrdering 返回一个元组列表并且您希望它变平,那么您只想在此处使用平面图。

如果我正确理解你的问题,你可以这样做:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]

在这种情况下,您可以使用模式匹配和 for/yield 语句 "flatten" 元组的 Array 部分:

newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]

虽然我还不清楚where/how你想用groupByKey

由于 anonymousOrdering() 是您代码中的一个函数,更新它以便 return 成为 Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]。这就像做 (tile, point) for tile in anonymousOrdering(point)] 但直接在匿名函数的末尾。 flatMap 然后会注意为序列的每个元素创建一个分区。

作为一般规则,避免将集合作为 RDD 中的键。