在 rdd 中映射 CompactBuffer
Map over CompactBuffer in an rdd
我有一个如下所示的 groupByKey 的 RDD,
(1, CompactBuffer(2.0, 3.0, 4.0)),
(2, CompactBuffer(1.0, -1.0, -2.0))
我希望将值映射到 (1*x_1^2, 2*x_2^2, 3*x_3^2)
应该是这样的,
(1, CompactBuffer(4.0, 18.0, 48.0)),
(2, CompactBuffer(1.0, 2.0, 12.0))
我该怎么办?
感谢您的帮助。
您可以使用 mapValues
使用 zip
和 Stream(1, 2, ...)
来处理 CompactBuffer 内容,如下所示:
val rdd = sc.parallelize(Seq(
(1, 2.0),
(1, 3.0),
(1, 4.0),
(2, 1.0),
(2, -1.0),
(2, -2.0)
))
val groupedRDD = rdd.groupByKey
// res1: Array[(Int, Iterable[Double])] = Array(
// (1,CompactBuffer(2.0, 3.0, 4.0)), (2,CompactBuffer(1.0, -1.0, -2.0))
// )
groupedRDD.mapValues( l =>
l.zip(Stream from 1).map{ case (v, i) => v * v * i }
)
// res2: Array[(Int, Iterable[Double])] = Array(
// (1,List(4.0, 18.0, 48.0)), (2,List(1.0, 2.0, 12.0))
// )
我有一个如下所示的 groupByKey 的 RDD,
(1, CompactBuffer(2.0, 3.0, 4.0)), (2, CompactBuffer(1.0, -1.0, -2.0))
我希望将值映射到 (1*x_1^2, 2*x_2^2, 3*x_3^2)
应该是这样的,
(1, CompactBuffer(4.0, 18.0, 48.0)), (2, CompactBuffer(1.0, 2.0, 12.0))
我该怎么办?
感谢您的帮助。
您可以使用 mapValues
使用 zip
和 Stream(1, 2, ...)
来处理 CompactBuffer 内容,如下所示:
val rdd = sc.parallelize(Seq(
(1, 2.0),
(1, 3.0),
(1, 4.0),
(2, 1.0),
(2, -1.0),
(2, -2.0)
))
val groupedRDD = rdd.groupByKey
// res1: Array[(Int, Iterable[Double])] = Array(
// (1,CompactBuffer(2.0, 3.0, 4.0)), (2,CompactBuffer(1.0, -1.0, -2.0))
// )
groupedRDD.mapValues( l =>
l.zip(Stream from 1).map{ case (v, i) => v * v * i }
)
// res2: Array[(Int, Iterable[Double])] = Array(
// (1,List(4.0, 18.0, 48.0)), (2,List(1.0, 2.0, 12.0))
// )