在 pyspark 中执行 partitionBy 列时消除特定列的空值行

Eliminate null value rows for a specific column while doing partitionBy column in pyspark

我有一个像这样的 pyspark 数据框:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
|333| null|   CT|
+---+-----+-----+

对于给定的 ID,如果 ID 不重复,即使列 "name" 为空,我也想保留该记录,但如果 ID 重复,那么我想检查名称列并确保它在该 ID 中不包含重复项,如果 "name" 为 null ONLY 对于重复的 ID,也将其删除。以下是所需的输出:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
+---+-----+-----+

如何在 PySpark 中实现此目的?

我认为您可以分两步完成。首先,按 id

计算值
import pyspark.sql.window as psw
w = psw.Window.partitionBy("id")
df = df.withColumn("n",psf.sum(psf.lit(1)).over(w))

然后过滤去除 Null when n<1:

df.filter(!((psf.col('name').isNull()) & (psf.col('n') > 1)))

编辑

如@Shubham Jain 所述,如果 name 有多个 Null 值(重复项),上述过滤器将保留它们。在这种情况下,@Shaido 提出的解决方案很有用:使用 .dropDuplicates(['id','name']) 添加 post 处理。或者 .dropDuplicates(['id','name','state']),根据您的喜好

您可以通过按 id 列分组并计算每组中的姓名数来完成此操作。默认情况下,Spark 将忽略空值,因此应保留计数为 0 的任何组。我们现在可以过滤掉计数大于 0 的组中的任何空值。

在 Scala 中,这可以通过 window 函数完成,如下所示:

val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
  .filter($"name".isNotNull or $"gCount" === 0)
  .drop("gCount")

PySpark 等价物:

w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
  .filter((col("name").isNotNull()) | (col("gCount") == 0))
  .drop("gCount")

以上不会删除同一 ID 具有多个空值的行(所有这些都将保留)。

如果这些也应该被删除,只保留一行 name==null,一个简单的方法是在上述代码 运行 之前或之后使用 .dropDuplicates(['id','name'])。请注意,这也会删除任何其他重复项(在这种情况下 .dropDuplicates(['id','name', 'state']) 可能更可取)。