PySpark:如何过滤来自列表的多个列?

PySpark: How to filter on multiple columns coming from a list?

我有一个数据框,它将在多个列上被过滤(非空)。这些列来自配置文件。
假设我有一个包含 filterCols: COLUMN_1,COLUMN_2...,COLUMN_10.
的配置文件 在我的代码中,我可以对列名称进行硬编码,例如:

df = dataDF.filter(~col("COLUMN_1").isNull() & 
                   ~col("COLUMN_2").isNull() &
                   ......
                   ~col("COLUMN_10").isNull())

但可以根据需要删除这些列,也可以添加更多列。
如果我按照上面的做法,每次都得改部署代码。

有没有一种方法可以让我遍历这些列然后进行过滤?
我试过这样的事情:

colList = ['COLUMN_1', 'COLUMN_2', ...,'COLUMN_10']
df = dataDF
for name in colList:
    df = df.filter(~col(name).isNull())

但是 df 显示零条记录。

您可以根据列生成查询字符串并使用 SparkSQL。

示例:

spark = SparkSession.builder.getOrCreate()
data = [
    {"a": 1, "b": 2, "c": 3},
    {"a": 1, "b": 2, "c": 3},
    {"a": 1, "b": 2, "c": 3, "d": 4},
]
df = spark.createDataFrame(data)
columns = ["a", "b", "c", "d"]
df.createTempView("table")
df = spark.sql(
    "SELECT * FROM table WHERE {}".format(
        " AND ".join(x + " IS NOT NULL" for x in columns)
    )
)

结果:

+---+---+---+---+                                                               
|a  |b  |c  |d  |
+---+---+---+---+
|1  |2  |3  |4  |
+---+---+---+---+

使用python functools.reduce链接多个条件:

from functools import reduce
import pyspark.sql.functions as F


filter_expr = reduce(lambda a, b: a & b, [F.col(c).isNotNull() for c in colList])

df = df.filter(filter_expr)