PySpark 重新分配重复行的值

PySpark reassign values of duplicate rows

我有这样一个数据框:

id,p1
1,A
2,null
3,B
4,null
4,null
2,C

我想使用 PySpark 删除所有重复项。但是,如果存在 p1 列不为空的副本,我想删除空列。例如,我想删除第一次出现的 id 2 和 id 4 中的任何一个。现在我将数据帧拆分为两个数据帧:

id,p1
1,A
3,B
2,C
id,p1
2,null
4,null
4,null

从两者中删除重复项,然后将不在第一个数据框中的重复项添加回来。就这样我得到了这个数据框。

id,p1
1,A
3,B
4,null
2,C

这是我目前拥有的:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('test').getOrCreate()
d = spark.createDataFrame(
    [(1,"A"),
     (2,None),
     (3,"B"),
     (4,None),
     (4,None),
     (2,"C")],
    ["id", "p"]
    )
d1 = d.filter(d.p.isNull())
d2 = d.filter(d.p.isNotNull())
d1 = d1.dropDuplicates()
d2 = d2.dropDuplicates()
d3 = d1.join(d2, "id", 'left_anti')
d4 = d2.unionByName(d3)

有没有更漂亮的方法呢?像这样真的感觉很多余,但我想不出更好的方法。我尝试使用 groupby 但无法实现。有任何想法吗?谢谢。

(df1.sort(col('p1').desc())#sort column descending and will put nulls low in list
 
.dropDuplicates(subset = ['id']).show()#Drop duplicates on column id
)

+---+----+
| id|  p1|
+---+----+
|  1|   A|
|  2|   C|
|  3|   B|
|  4|null|
+---+----+

使用windowrow_number()函数和排序"p" descending.

Example:

d.show()
#+---+----+
#| id|   p|
#+---+----+
#|  1|   A|
#|  2|null|
#|  3|   B|
#|  4|null|
#|  4|null|
#|  2|   C|
#+---+----+

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

window_spec=row_number().over(Window.partitionBy("id").orderBy(col("p").desc()))

d.withColumn("rn",window_spec).filter(col("rn")==1).drop("rn").show()
#+---+----+
#| id|   p|
#+---+----+
#|  1|   A|
#|  3|   B|
#|  2|   C|
#|  4|null|
#+---+----+