如何在 Spark 中转置 RDD

How to transpose an RDD in Spark

我有这样一个 RDD:

1 2 3
4 5 6
7 8 9

它是一个矩阵。现在我想像这样转置 RDD:

1 4 7
2 5 8
3 6 9


假设你有一个 N×M 矩阵。

如果 N 和 M 都小到可以在内存中容纳 N×M 项,那么使用 RDD 就没有多大意义。但是转置很容易:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)

如果 N 或 M 太大以至于您不能在内存中保存 N 或 M 条目,那么您就不能拥有这种大小的 RDD 行。在这种情况下,原始矩阵或转置矩阵都无法表示。


val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
  case (row, rowIndex) => row.zipWithIndex.map {
    case (number, columnIndex) => columnIndex -> (rowIndex, number)
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
  indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)

没有使用 collect() 的初稿,所以一切都在工作端运行,驱动程序没有做任何事情:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))

rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
   .map(v => (v._2, v._1)) // key by column position
   .groupByKey.sortByKey   // regroup on column position, thus all elements from the first column will be in the first row
   .map(_._2)              // discard the key, keep only value


我的想法是,除了将 'column number' 附加到矩阵的每个元素之外,我们还附加了 'row number'。因此,我们可以像示例中那样按列位置键并按键重新分组,但随后我们可以对行号上的每一行重新排序,然后从结果中删除 row/column 数字。 将文件导入 RDD 时,我无法知道行号。



从 Spark 1.6 开始,您可以使用 pivot operation on DataFrames, depending on the actual shape of your data, if you put it into a DF you could pivot columns to rows, the following databricks blog 非常有用,因为它详细描述了许多带有代码示例的旋转用例