在 pyspark 中执行 partitionBy 列时消除特定列的空值行
Eliminate null value rows for a specific column while doing partitionBy column in pyspark
我有一个像这样的 pyspark 数据框:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
|333| null| CT|
+---+-----+-----+
对于给定的 ID,如果 ID 不重复,即使列 "name" 为空,我也想保留该记录,但如果 ID 重复,那么我想检查名称列并确保它在该 ID 中不包含重复项,如果 "name" 为 null ONLY 对于重复的 ID,也将其删除。以下是所需的输出:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
+---+-----+-----+
如何在 PySpark 中实现此目的?
我认为您可以分两步完成。首先,按 id
计算值
import pyspark.sql.window as psw
w = psw.Window.partitionBy("id")
df = df.withColumn("n",psf.sum(psf.lit(1)).over(w))
然后过滤去除 Null
when n<1
:
df.filter(!((psf.col('name').isNull()) & (psf.col('n') > 1)))
编辑
如@Shubham Jain 所述,如果 name
有多个 Null
值(重复项),上述过滤器将保留它们。在这种情况下,@Shaido 提出的解决方案很有用:使用 .dropDuplicates(['id','name'])
添加 post 处理。或者 .dropDuplicates(['id','name','state'])
,根据您的喜好
您可以通过按 id 列分组并计算每组中的姓名数来完成此操作。默认情况下,Spark 将忽略空值,因此应保留计数为 0 的任何组。我们现在可以过滤掉计数大于 0 的组中的任何空值。
在 Scala 中,这可以通过 window 函数完成,如下所示:
val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
.filter($"name".isNotNull or $"gCount" === 0)
.drop("gCount")
PySpark 等价物:
w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
.filter((col("name").isNotNull()) | (col("gCount") == 0))
.drop("gCount")
以上不会删除同一 ID 具有多个空值的行(所有这些都将保留)。
如果这些也应该被删除,只保留一行 name==null
,一个简单的方法是在上述代码 运行 之前或之后使用 .dropDuplicates(['id','name'])
。请注意,这也会删除任何其他重复项(在这种情况下 .dropDuplicates(['id','name', 'state'])
可能更可取)。
我有一个像这样的 pyspark 数据框:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
|333| null| CT|
+---+-----+-----+
对于给定的 ID,如果 ID 不重复,即使列 "name" 为空,我也想保留该记录,但如果 ID 重复,那么我想检查名称列并确保它在该 ID 中不包含重复项,如果 "name" 为 null ONLY 对于重复的 ID,也将其删除。以下是所需的输出:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
+---+-----+-----+
如何在 PySpark 中实现此目的?
我认为您可以分两步完成。首先,按 id
import pyspark.sql.window as psw
w = psw.Window.partitionBy("id")
df = df.withColumn("n",psf.sum(psf.lit(1)).over(w))
然后过滤去除 Null
when n<1
:
df.filter(!((psf.col('name').isNull()) & (psf.col('n') > 1)))
编辑
如@Shubham Jain 所述,如果 name
有多个 Null
值(重复项),上述过滤器将保留它们。在这种情况下,@Shaido 提出的解决方案很有用:使用 .dropDuplicates(['id','name'])
添加 post 处理。或者 .dropDuplicates(['id','name','state'])
,根据您的喜好
您可以通过按 id 列分组并计算每组中的姓名数来完成此操作。默认情况下,Spark 将忽略空值,因此应保留计数为 0 的任何组。我们现在可以过滤掉计数大于 0 的组中的任何空值。
在 Scala 中,这可以通过 window 函数完成,如下所示:
val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
.filter($"name".isNotNull or $"gCount" === 0)
.drop("gCount")
PySpark 等价物:
w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
.filter((col("name").isNotNull()) | (col("gCount") == 0))
.drop("gCount")
以上不会删除同一 ID 具有多个空值的行(所有这些都将保留)。
如果这些也应该被删除,只保留一行 name==null
,一个简单的方法是在上述代码 运行 之前或之后使用 .dropDuplicates(['id','name'])
。请注意,这也会删除任何其他重复项(在这种情况下 .dropDuplicates(['id','name', 'state'])
可能更可取)。