接收未知列数的 Spark UDF
Spark UDF that takes in unknown number of columns
我有一个具有不同模式的 spark 数据帧列表。示例:
list_df = [df1, df2, df3, df4]
# df1.columns = ['a', 'b']
# df2.columns = ['a', 'b', 'c']
# df3.columns = ['a', 'b', 'c', 'd']
# df4.columns = ['a', 'b', 'c', 'd', 'e']
现在,我想编写一个能够对具有不同列数的数据帧列表进行操作的单个 udf。
之前有一篇关于如何使用 scala 执行此操作的 post:,其中 udf 接受列数组。
但该方法似乎不适用于 python。有什么建议吗?
谢谢。
实际上这种方法在 Python:
中工作得很好
from pyspark.sql.functions import array, udf
df = sc.parallelize([("a", "b", "c", "d")]).toDF()
f = udf(lambda xs: "+".join(xs))
df.select(f("_1")).show()
## +------------+
## |<lambda>(_1)|
## +------------+
## | a|
## +------------+
df.select(f(array("_1", "_2"))).show()
## +-----------------------+
## |<lambda>(array(_1, _2))|
## +-----------------------+
## | a+b|
## +-----------------------+
df.select(f(array("_1", "_2", "_3"))).show()
## +---------------------------+
## |<lambda>(array(_1, _2, _3))|
## +---------------------------+
## | a+b+c|
## +---------------------------+
因为 Python UDF 不像它们的 Scala 对应物那样是同一类型的实体,不受输入参数的类型和数量的限制,您也使用 args:
g = udf(lambda *xs: "+".join(xs))
df.select(g("_1", "_2", "_3", "_4")).show()
## +------------------------+
## |<lambda>(_1, _2, _3, _4)|
## +------------------------+
## | a+b+c+d|
## +------------------------+
避免用 array
换行输入。
您还可以使用 struct
作为替代包装器来访问列名:
h = udf(lambda row: "+".join(row.asDict().keys()))
df.select(h(struct("_1", "_2", "_3"))).show()
## +----------------------------+
## |<lambda>(struct(_1, _2, _3))|
## +----------------------------+
## | _1+_3+_2|
## +----------------------------+
我有一个具有不同模式的 spark 数据帧列表。示例:
list_df = [df1, df2, df3, df4]
# df1.columns = ['a', 'b']
# df2.columns = ['a', 'b', 'c']
# df3.columns = ['a', 'b', 'c', 'd']
# df4.columns = ['a', 'b', 'c', 'd', 'e']
现在,我想编写一个能够对具有不同列数的数据帧列表进行操作的单个 udf。
之前有一篇关于如何使用 scala 执行此操作的 post:
但该方法似乎不适用于 python。有什么建议吗?
谢谢。
实际上这种方法在 Python:
中工作得很好from pyspark.sql.functions import array, udf
df = sc.parallelize([("a", "b", "c", "d")]).toDF()
f = udf(lambda xs: "+".join(xs))
df.select(f("_1")).show()
## +------------+
## |<lambda>(_1)|
## +------------+
## | a|
## +------------+
df.select(f(array("_1", "_2"))).show()
## +-----------------------+
## |<lambda>(array(_1, _2))|
## +-----------------------+
## | a+b|
## +-----------------------+
df.select(f(array("_1", "_2", "_3"))).show()
## +---------------------------+
## |<lambda>(array(_1, _2, _3))|
## +---------------------------+
## | a+b+c|
## +---------------------------+
因为 Python UDF 不像它们的 Scala 对应物那样是同一类型的实体,不受输入参数的类型和数量的限制,您也使用 args:
g = udf(lambda *xs: "+".join(xs))
df.select(g("_1", "_2", "_3", "_4")).show()
## +------------------------+
## |<lambda>(_1, _2, _3, _4)|
## +------------------------+
## | a+b+c+d|
## +------------------------+
避免用 array
换行输入。
您还可以使用 struct
作为替代包装器来访问列名:
h = udf(lambda row: "+".join(row.asDict().keys()))
df.select(h(struct("_1", "_2", "_3"))).show()
## +----------------------------+
## |<lambda>(struct(_1, _2, _3))|
## +----------------------------+
## | _1+_3+_2|
## +----------------------------+