扁平化 RDD 的键
Flattening the key of a RDD
我有一个 (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double])
类型的 Spark RDD。我希望将它的键展平以将其转换为 breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])
类型的 RDD。我目前正在做:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
anonymousOrdering() 的签名是String => (Array[DenseVector[Double]], DenseVector[Double])
。
它returns type mismatch: required: TraversableOnce[?]
。 Python 做同样事情的代码是:
newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])
如何在 Scala 中做同样的事情?我一般用flatMapValues
但是这里需要把key压平
更改代码以使用 Map 而不是 FlatMap:
val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()
如果 anonymousOrdering 返回一个元组列表并且您希望它变平,那么您只想在此处使用平面图。
如果我正确理解你的问题,你可以这样做:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]
在这种情况下,您可以使用模式匹配和 for
/yield
语句 "flatten" 元组的 Array
部分:
newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]
虽然我还不清楚where/how你想用groupByKey
由于 anonymousOrdering()
是您代码中的一个函数,更新它以便 return 成为 Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]
。这就像做 (tile, point) for tile in anonymousOrdering(point)]
但直接在匿名函数的末尾。 flatMap
然后会注意为序列的每个元素创建一个分区。
作为一般规则,避免将集合作为 RDD 中的键。
我有一个 (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double])
类型的 Spark RDD。我希望将它的键展平以将其转换为 breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])
类型的 RDD。我目前正在做:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
anonymousOrdering() 的签名是String => (Array[DenseVector[Double]], DenseVector[Double])
。
它returns type mismatch: required: TraversableOnce[?]
。 Python 做同样事情的代码是:
newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])
如何在 Scala 中做同样的事情?我一般用flatMapValues
但是这里需要把key压平
更改代码以使用 Map 而不是 FlatMap:
val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()
如果 anonymousOrdering 返回一个元组列表并且您希望它变平,那么您只想在此处使用平面图。
如果我正确理解你的问题,你可以这样做:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]
在这种情况下,您可以使用模式匹配和 for
/yield
语句 "flatten" 元组的 Array
部分:
newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]
虽然我还不清楚where/how你想用groupByKey
由于 anonymousOrdering()
是您代码中的一个函数,更新它以便 return 成为 Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]
。这就像做 (tile, point) for tile in anonymousOrdering(point)]
但直接在匿名函数的末尾。 flatMap
然后会注意为序列的每个元素创建一个分区。
作为一般规则,避免将集合作为 RDD 中的键。