如何在 Spark 中转置 RDD
How to transpose an RDD in Spark
我有这样一个 RDD:
1 2 3
4 5 6
7 8 9
它是一个矩阵。现在我想像这样转置 RDD:
1 4 7
2 5 8
3 6 9
我该怎么做?
假设你有一个 N×M 矩阵。
如果 N 和 M 都小到可以在内存中容纳 N×M 项,那么使用 RDD 就没有多大意义。但是转置很容易:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
如果 N 或 M 太大以至于您不能在内存中保存 N 或 M 条目,那么您就不能拥有这种大小的 RDD 行。在这种情况下,原始矩阵或转置矩阵都无法表示。
N和M可能是中等大小:内存中可以容纳N或M个条目,但不能容纳N×M个条目。在这种情况下,您必须炸毁矩阵并将其重新组合在一起:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
没有使用 collect() 的初稿,所以一切都在工作端运行,驱动程序没有做任何事情:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
.map(v => (v._2, v._1)) // key by column position
.groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row
.map(_._2) // discard the key, keep only value
此解决方案的问题在于,如果在分布式系统中执行操作,转置矩阵中的列最终会被打乱。会想到改进版
我的想法是,除了将 'column number' 附加到矩阵的每个元素之外,我们还附加了 'row number'。因此,我们可以像示例中那样按列位置键并按键重新分组,但随后我们可以对行号上的每一行重新排序,然后从结果中删除 row/column 数字。
将文件导入 RDD 时,我无法知道行号。
您可能认为将列号和行号附加到每个矩阵元素很繁重,但我想这是有可能以分布式方式将您的输入作为块处理并因此处理巨大矩阵所付出的代价.
当我找到解决排序问题的方法时会更新答案。
从 Spark 1.6 开始,您可以使用 pivot operation on DataFrames, depending on the actual shape of your data, if you put it into a DF you could pivot columns to rows, the following databricks blog 非常有用,因为它详细描述了许多带有代码示例的旋转用例
我有这样一个 RDD:
1 2 3
4 5 6
7 8 9
它是一个矩阵。现在我想像这样转置 RDD:
1 4 7
2 5 8
3 6 9
我该怎么做?
假设你有一个 N×M 矩阵。
如果 N 和 M 都小到可以在内存中容纳 N×M 项,那么使用 RDD 就没有多大意义。但是转置很容易:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
如果 N 或 M 太大以至于您不能在内存中保存 N 或 M 条目,那么您就不能拥有这种大小的 RDD 行。在这种情况下,原始矩阵或转置矩阵都无法表示。
N和M可能是中等大小:内存中可以容纳N或M个条目,但不能容纳N×M个条目。在这种情况下,您必须炸毁矩阵并将其重新组合在一起:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
没有使用 collect() 的初稿,所以一切都在工作端运行,驱动程序没有做任何事情:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
.map(v => (v._2, v._1)) // key by column position
.groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row
.map(_._2) // discard the key, keep only value
此解决方案的问题在于,如果在分布式系统中执行操作,转置矩阵中的列最终会被打乱。会想到改进版
我的想法是,除了将 'column number' 附加到矩阵的每个元素之外,我们还附加了 'row number'。因此,我们可以像示例中那样按列位置键并按键重新分组,但随后我们可以对行号上的每一行重新排序,然后从结果中删除 row/column 数字。 将文件导入 RDD 时,我无法知道行号。
您可能认为将列号和行号附加到每个矩阵元素很繁重,但我想这是有可能以分布式方式将您的输入作为块处理并因此处理巨大矩阵所付出的代价.
当我找到解决排序问题的方法时会更新答案。
从 Spark 1.6 开始,您可以使用 pivot operation on DataFrames, depending on the actual shape of your data, if you put it into a DF you could pivot columns to rows, the following databricks blog 非常有用,因为它详细描述了许多带有代码示例的旋转用例