Pyspark:如何使用带有 reduce 的字典链接 Column.when()?

Pyspark: How to chain Column.when() using a dictionary with reduce?

我试图从 when() 函数链中的字典中获取条件,使用 reduce() 最终传递给 dataframe.withColumn()

例如:

conditions = {
    "0": (col("a") == 1.0) & (col("b") != 1.0),
    "1": (col("c") == 1.0) & (col("d") == 1.0)
}

使用 reduce() 我实现了这个:

when_stats = reduce(lambda key, value: when(conditions[key], lit(key)), conditions)

然后在 withColumn() 中使用它:

df2 = df1.withColumn(result, when_stats)

问题是它只接受第一个条件“0”,而没有链接第二个条件。 打印 'when_stats' 给我:

Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 END'>

当我添加第三个条件时,它抛出错误并且不起作用:

TypeError: unhashable type: 'Column'

所以问题是,我如何遍历字典并创建完整的 when().when().when()...?如果我最后想要 otherwise() 有没有更好的解决方案?

当您将 reduce 与 dict 对象一起使用时,您实际上是在迭代 dict 的键。所以 lambda 函数采用 acc 累加器和 key 正在处理的实际密钥。

您可以改用这个:

from functools import reduce
from pyspark.sql import functions as F

conditions = {
    "0": (F.col("a") == 1.0) & (F.col("b") != 1.0),
    "1": (F.col("c") == 1.0) & (F.col("d") == 1.0)
}

when_stats = reduce(
    lambda acc, key: acc.when(conditions[key], key),
    conditions,
    F
) #.otherwise("default_value")

print(when_stats)
# Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 WHEN ((c = 1.0) AND (d = 1.0)) THEN 1 END'>