如何从配置文件或数组动态创建 pyspark 代码?

How to dynamically create pyspark code from config file or array?

有人知道有办法从配置输入动态构建 pyspark 命令吗?

我试图以一个命令结束,结果类似于;

newDF = df.select('*', when(df.a == 1,'1').when(df.a == 2, '2').alias('new_col') )

when 表达式在数量和内容上是可变的。所以可能是这样的;

conditions = 
[
"when(df.a == 'something')",
"when(df.a >= 'something else')"
]

我可以设计条件的结构,所以那部分待定。更多的是我如何使用这种方法构建命令,而不会想到我正在尝试向它传递一个字符串。我的第一次尝试看起来像;

command = ".when(df.a == '1', 'something').when(df.a == '2', 'somethingelse')"

newDF = df.select("*", command)

但是,我收到的错误是 Spark 不喜欢我传递字符串。

感谢任何帮助!

可能有很多方法,但根据您在问题中提供的示例,您可以考虑以下两个选项:

使用字符串表达式

您可以创建一个包含要创建的列名的元组列表以及传递给 F.expr 函数的相应 SQL 表达式,如下所示:

from pyspark.sql import functions as F

new_cols = [
    ("new_col", "case when a = 1 then 'something' when a = 2 then 'somethingelse' end"),
    ("new_col2", "case when a = 1 then true when a = 2 then false end")
]

df.select("*", *[F.expr(x[1]).alias(x[0]) for x in new_cols])
动态构造when表达式

您可以为要创建的列定义一个 case/when 条件列表,然后使用 python functools.reduce 构建 when 表达式,如下所示:

from functools import reduce
from pyspark.sql import functions as F

conditions = [
    ('1', 'something'),
    ('2', 'somethingelse')
]

new_col = reduce(lambda acc, x: acc.when(F.col("a") == x[0], x[1]), conditions, F)

df.select("*", new_col.alias("new_col"))