Pyspark:如何使用带有 reduce 的字典链接 Column.when()?
Pyspark: How to chain Column.when() using a dictionary with reduce?
我试图从 when()
函数链中的字典中获取条件,使用 reduce()
最终传递给 dataframe.withColumn()
。
例如:
conditions = {
"0": (col("a") == 1.0) & (col("b") != 1.0),
"1": (col("c") == 1.0) & (col("d") == 1.0)
}
使用 reduce() 我实现了这个:
when_stats = reduce(lambda key, value: when(conditions[key], lit(key)), conditions)
然后在 withColumn() 中使用它:
df2 = df1.withColumn(result, when_stats)
问题是它只接受第一个条件“0”,而没有链接第二个条件。
打印 'when_stats' 给我:
Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 END'>
当我添加第三个条件时,它抛出错误并且不起作用:
TypeError: unhashable type: 'Column'
所以问题是,我如何遍历字典并创建完整的 when().when().when()
...?如果我最后想要 otherwise()
有没有更好的解决方案?
当您将 reduce
与 dict 对象一起使用时,您实际上是在迭代 dict 的键。所以 lambda 函数采用 acc
累加器和 key
正在处理的实际密钥。
您可以改用这个:
from functools import reduce
from pyspark.sql import functions as F
conditions = {
"0": (F.col("a") == 1.0) & (F.col("b") != 1.0),
"1": (F.col("c") == 1.0) & (F.col("d") == 1.0)
}
when_stats = reduce(
lambda acc, key: acc.when(conditions[key], key),
conditions,
F
) #.otherwise("default_value")
print(when_stats)
# Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 WHEN ((c = 1.0) AND (d = 1.0)) THEN 1 END'>
我试图从 when()
函数链中的字典中获取条件,使用 reduce()
最终传递给 dataframe.withColumn()
。
例如:
conditions = {
"0": (col("a") == 1.0) & (col("b") != 1.0),
"1": (col("c") == 1.0) & (col("d") == 1.0)
}
使用 reduce() 我实现了这个:
when_stats = reduce(lambda key, value: when(conditions[key], lit(key)), conditions)
然后在 withColumn() 中使用它:
df2 = df1.withColumn(result, when_stats)
问题是它只接受第一个条件“0”,而没有链接第二个条件。 打印 'when_stats' 给我:
Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 END'>
当我添加第三个条件时,它抛出错误并且不起作用:
TypeError: unhashable type: 'Column'
所以问题是,我如何遍历字典并创建完整的 when().when().when()
...?如果我最后想要 otherwise()
有没有更好的解决方案?
当您将 reduce
与 dict 对象一起使用时,您实际上是在迭代 dict 的键。所以 lambda 函数采用 acc
累加器和 key
正在处理的实际密钥。
您可以改用这个:
from functools import reduce
from pyspark.sql import functions as F
conditions = {
"0": (F.col("a") == 1.0) & (F.col("b") != 1.0),
"1": (F.col("c") == 1.0) & (F.col("d") == 1.0)
}
when_stats = reduce(
lambda acc, key: acc.when(conditions[key], key),
conditions,
F
) #.otherwise("default_value")
print(when_stats)
# Column<'CASE WHEN ((a = 1.0) AND (NOT (b = 1.0))) THEN 0 WHEN ((c = 1.0) AND (d = 1.0)) THEN 1 END'>