如何从配置文件或数组动态创建 pyspark 代码?
How to dynamically create pyspark code from config file or array?
有人知道有办法从配置输入动态构建 pyspark 命令吗?
我试图以一个命令结束,结果类似于;
newDF = df.select('*', when(df.a == 1,'1').when(df.a == 2, '2').alias('new_col') )
when 表达式在数量和内容上是可变的。所以可能是这样的;
conditions =
[
"when(df.a == 'something')",
"when(df.a >= 'something else')"
]
我可以设计条件的结构,所以那部分待定。更多的是我如何使用这种方法构建命令,而不会想到我正在尝试向它传递一个字符串。我的第一次尝试看起来像;
command = ".when(df.a == '1', 'something').when(df.a == '2', 'somethingelse')"
newDF = df.select("*", command)
但是,我收到的错误是 Spark 不喜欢我传递字符串。
感谢任何帮助!
可能有很多方法,但根据您在问题中提供的示例,您可以考虑以下两个选项:
使用字符串表达式
您可以创建一个包含要创建的列名的元组列表以及传递给 F.expr
函数的相应 SQL 表达式,如下所示:
from pyspark.sql import functions as F
new_cols = [
("new_col", "case when a = 1 then 'something' when a = 2 then 'somethingelse' end"),
("new_col2", "case when a = 1 then true when a = 2 then false end")
]
df.select("*", *[F.expr(x[1]).alias(x[0]) for x in new_cols])
动态构造when
表达式
您可以为要创建的列定义一个 case/when 条件列表,然后使用 python functools.reduce
构建 when
表达式,如下所示:
from functools import reduce
from pyspark.sql import functions as F
conditions = [
('1', 'something'),
('2', 'somethingelse')
]
new_col = reduce(lambda acc, x: acc.when(F.col("a") == x[0], x[1]), conditions, F)
df.select("*", new_col.alias("new_col"))
有人知道有办法从配置输入动态构建 pyspark 命令吗?
我试图以一个命令结束,结果类似于;
newDF = df.select('*', when(df.a == 1,'1').when(df.a == 2, '2').alias('new_col') )
when 表达式在数量和内容上是可变的。所以可能是这样的;
conditions =
[
"when(df.a == 'something')",
"when(df.a >= 'something else')"
]
我可以设计条件的结构,所以那部分待定。更多的是我如何使用这种方法构建命令,而不会想到我正在尝试向它传递一个字符串。我的第一次尝试看起来像;
command = ".when(df.a == '1', 'something').when(df.a == '2', 'somethingelse')"
newDF = df.select("*", command)
但是,我收到的错误是 Spark 不喜欢我传递字符串。
感谢任何帮助!
可能有很多方法,但根据您在问题中提供的示例,您可以考虑以下两个选项:
使用字符串表达式
您可以创建一个包含要创建的列名的元组列表以及传递给 F.expr
函数的相应 SQL 表达式,如下所示:
from pyspark.sql import functions as F
new_cols = [
("new_col", "case when a = 1 then 'something' when a = 2 then 'somethingelse' end"),
("new_col2", "case when a = 1 then true when a = 2 then false end")
]
df.select("*", *[F.expr(x[1]).alias(x[0]) for x in new_cols])
动态构造when
表达式
您可以为要创建的列定义一个 case/when 条件列表,然后使用 python functools.reduce
构建 when
表达式,如下所示:
from functools import reduce
from pyspark.sql import functions as F
conditions = [
('1', 'something'),
('2', 'somethingelse')
]
new_col = reduce(lambda acc, x: acc.when(F.col("a") == x[0], x[1]), conditions, F)
df.select("*", new_col.alias("new_col"))