如何使用原始顺序克隆 spark 中的列值

How to clone column values in spark with their original order

我想将列的值按原始顺序克隆 n 次。 例如,如果我想在列下方复制 2 次:

+---+
| v |
+---+
| 1 |
| 2 |
| 3 |
+---+

我在找什么:

+---+
| v |
+---+
| 1 |
| 2 |
| 3 |
| 1 |
| 2 |
| 3 |
+---+

使用 explode 或 flatMap 我只能得到:

+---+
| v |
+---+
| 1 |
| 1 |
| 2 |
| 2 |
| 3 |
| 3 |
+---+

代码:

%spark
val ds = spark.range(1, 4)
val cloneCount = 2

val clonedDs = ds.flatMap(r => Seq.fill(cloneCount)(r))
clonedDs.show()

我可能可以对数据集 ds 进行自联合,但是如果 cloneCount 很大,例如。 cloneCount = 200000,在一个循环中union那么多次是不是首选方案?

你可以试试这个:

// If the column values are expected to be in an increasing/descresing sequence
// then we add that to the orderBy: clone_index and col_value
// to get the values in order as they were initially

val clonedDs = ds.flatMap(col_value => Range(0, cloneCount)
                   .map(clone_index=>(clone_index,col_value)) )
clonedDs.orderBy("_1", "_2").map(_._2).show()



// If the column values are not expected to follow a sequence
// then we add another rank column and use that in orderBy along with clone_index
// to get the col_values in desired order

val clonedDs = ds.withColumn("rank", monotonically_increasing_id())
    .flatMap(row => Range(0, cloneCount).map(
                clone_index=> (clone_index, row.getLong(1), row.getLong(0))
          ) )

clonedDs.orderBy("_1", "_2").map(_._3).show()