在 Spark SQL Dataframe 中压缩和分解多个列
Zip and Explode multiple Columns in Spark SQL Dataframe
我有一个具有以下结构的数据框:
A: Array[String] | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D] | [1, 2, 3, 4] | [ ... array with 4 elements ... ]
[E, F, G, H, I] | [5, 6, 7, 8, 9] | [ ... array with 5 elements ... ]
[J] | [10] | [ ... array with 1 element ... ]
我想写一个UDF,那个
- 压缩 DF 中每列第 i 个位置的元素
- 在每个压缩元组上展开 DF
结果列应如下所示:
ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]
目前我正在对 UDF 使用多重调用(每个列名一个,列名列表是在运行时之前收集的):
val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
xa.indices.map(i => {
xa(i) :+ xb(i) // Add one element to the zip column
})
})
val allColumnNames = df.columns.filter(...)
for (columnName <- allColumnNames) {
df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")
由于数据框可以有数百列,withColumn
的这种迭代调用似乎需要很长时间。
问题:使用一个 UDF 和一个 DF.withColumn(...)
调用是否可行?
重要:UDF 应压缩动态数量的列(在运行时读取)。
如果您知道并确定数组中值的数量,下面可能是一个更简单的解决方案
select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table
使用将可变数量的列作为输入的 UDF
。这可以通过数组的数组来完成(假设类型相同)。由于您有一个数组数组,因此可以使用 transpose
这将获得与将列表压缩在一起相同的结果。然后可以分解生成的数组。
val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
cols.transpose
})
val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))
请注意,在 Spark 2.4+ 中,可以使用 arrays_zip
而不是 UDF
:
val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))
我有一个具有以下结构的数据框:
A: Array[String] | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D] | [1, 2, 3, 4] | [ ... array with 4 elements ... ]
[E, F, G, H, I] | [5, 6, 7, 8, 9] | [ ... array with 5 elements ... ]
[J] | [10] | [ ... array with 1 element ... ]
我想写一个UDF,那个
- 压缩 DF 中每列第 i 个位置的元素
- 在每个压缩元组上展开 DF
结果列应如下所示:
ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]
目前我正在对 UDF 使用多重调用(每个列名一个,列名列表是在运行时之前收集的):
val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
xa.indices.map(i => {
xa(i) :+ xb(i) // Add one element to the zip column
})
})
val allColumnNames = df.columns.filter(...)
for (columnName <- allColumnNames) {
df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")
由于数据框可以有数百列,withColumn
的这种迭代调用似乎需要很长时间。
问题:使用一个 UDF 和一个 DF.withColumn(...)
调用是否可行?
重要:UDF 应压缩动态数量的列(在运行时读取)。
如果您知道并确定数组中值的数量,下面可能是一个更简单的解决方案
select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table
使用将可变数量的列作为输入的 UDF
。这可以通过数组的数组来完成(假设类型相同)。由于您有一个数组数组,因此可以使用 transpose
这将获得与将列表压缩在一起相同的结果。然后可以分解生成的数组。
val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
cols.transpose
})
val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))
请注意,在 Spark 2.4+ 中,可以使用 arrays_zip
而不是 UDF
:
val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))