Spark/Scala 在多个列上使用相同的函数重复调用 withColumn()
Spark/Scala repeated calls to withColumn() using the same function on multiple columns
我目前有一些代码,其中我通过多个 .withColumn 链将相同的过程重复应用于多个 DataFrame 列,并且我想创建一个函数来简化该过程。在我的例子中,我找到了按键聚合的列的累计总和:
val newDF = oldDF
.withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))
.withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))
.withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))
//.withColumn(...)
我想要的是:
def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = {
// Implement the above cumulative sums, partitioning, and ordering
}
或更好:
def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = {
// Implement a udf/arbitrary function on all the specified columns
}
您可以将 select
与可变参数一起使用,包括 *
:
import spark.implicits._
df.select($"*" +: Seq("A", "B", "C").map(c =>
sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
): _*)
这个:
- 使用
Seq("A", ...).map(...)
将列名称映射到 window 表达式
- 在所有预先存在的列前添加
$"*" +: ...
。
- 使用
... : _*
解压组合序列。
可以归纳为:
import org.apache.spark.sql.{Column, DataFrame}
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
*/
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =
df.select($"*" +: cols.map(c => f(c)): _*)
如果您发现 withColumn
语法更具可读性,您可以使用 foldLeft
:
Seq("A", "B", "C").foldLeft(df)((df, c) =>
df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time")))
)
可以概括为:
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
* @param name a function mapping from input to output name.
*/
def withColumns(cols: Seq[String], df: DataFrame,
f: String => Column, name: String => String = identity) =
cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))
这个问题有点老了,但我认为(也许对其他人)注意使用 DataFrame
作为累加器折叠列列表并映射到 [=10= 会很有用] 当列数不是微不足道时,性能结果会大不相同(有关完整说明,请参阅 here)。
长话短说......对于几列 foldLeft
很好,否则 map
更好。
在 PySpark 中:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy("ID").orderBy("time")
df.select(
"*", # selects all existing columns
*[
F.sum(col).over(windowval).alias(col_name)
for col, col_name in zip(["A", "B", "C"], ["cumA", "cumB", "cumC"])
]
)
我目前有一些代码,其中我通过多个 .withColumn 链将相同的过程重复应用于多个 DataFrame 列,并且我想创建一个函数来简化该过程。在我的例子中,我找到了按键聚合的列的累计总和:
val newDF = oldDF
.withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))
.withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))
.withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))
//.withColumn(...)
我想要的是:
def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = {
// Implement the above cumulative sums, partitioning, and ordering
}
或更好:
def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = {
// Implement a udf/arbitrary function on all the specified columns
}
您可以将 select
与可变参数一起使用,包括 *
:
import spark.implicits._
df.select($"*" +: Seq("A", "B", "C").map(c =>
sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
): _*)
这个:
- 使用
Seq("A", ...).map(...)
将列名称映射到 window 表达式
- 在所有预先存在的列前添加
$"*" +: ...
。 - 使用
... : _*
解压组合序列。
可以归纳为:
import org.apache.spark.sql.{Column, DataFrame}
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
*/
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =
df.select($"*" +: cols.map(c => f(c)): _*)
如果您发现 withColumn
语法更具可读性,您可以使用 foldLeft
:
Seq("A", "B", "C").foldLeft(df)((df, c) =>
df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time")))
)
可以概括为:
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
* @param name a function mapping from input to output name.
*/
def withColumns(cols: Seq[String], df: DataFrame,
f: String => Column, name: String => String = identity) =
cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))
这个问题有点老了,但我认为(也许对其他人)注意使用 DataFrame
作为累加器折叠列列表并映射到 [=10= 会很有用] 当列数不是微不足道时,性能结果会大不相同(有关完整说明,请参阅 here)。
长话短说......对于几列 foldLeft
很好,否则 map
更好。
在 PySpark 中:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy("ID").orderBy("time")
df.select(
"*", # selects all existing columns
*[
F.sum(col).over(windowval).alias(col_name)
for col, col_name in zip(["A", "B", "C"], ["cumA", "cumB", "cumC"])
]
)