Pyspark Dataframe 添加条件到 `reduce(add,(F.col(x) ... `

Pyspark Dataframe add condition to `reduce(add,(F.col(x) ... `

让我们考虑一个数据帧 df 如下:

df.show()
+-----+-----+-----+-----+-----+
|col A|val_1|val_2|val_3|val_4|
+-----+-----+-----+-----+-----+
|city1|  100|  100|  200|  100|
|city2|  200|  300|  300|  100|
|city1|  100|  100|  100|  100|
|city2|  500|  200|  200|  200|
+-----+-----+-----+-----+-----+

如果我想在列 val_i 中添加值并将它们放入新列 sum,我可以这样做以下:

from functools import reduce
from operator import add
val_cols = [x for x in df.columns if 'val' in x]
df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols)))).show()
+-----+-----+-----+-----+-----+----+
|col A|val_1|val_2|val_3|val_4| sum|
+-----+-----+-----+-----+-----+----+
|city1|  100|  100|  200|  100| 500|
|city2|  200|  300|  300|  100| 900|
|city1|  100|  100|  100|  100| 400|
|city2|  500|  200|  200|  200|1100|
+-----+-----+-----+-----+-----+----+

如何在 (reduce(add,(F.col(x) ... 参数中添加条件?例如,如果我只想包含超过 200 的值。我试过这个

df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols if F.col(x)>200)))).show()

但出现以下错误:

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

这可以通过提前使用 f.when(...).otherwise(...) 构造条件来实现:

functools.reduce(
  operator.add, 
  [f.when(f.col(c) > 200, f.col(c)).otherwise(f.lit(0)) for c in df1.columns]
)