PySpark 重新分配重复行的值
PySpark reassign values of duplicate rows
我有这样一个数据框:
id,p1
1,A
2,null
3,B
4,null
4,null
2,C
我想使用 PySpark 删除所有重复项。但是,如果存在 p1 列不为空的副本,我想删除空列。例如,我想删除第一次出现的 id 2 和 id 4 中的任何一个。现在我将数据帧拆分为两个数据帧:
id,p1
1,A
3,B
2,C
id,p1
2,null
4,null
4,null
从两者中删除重复项,然后将不在第一个数据框中的重复项添加回来。就这样我得到了这个数据框。
id,p1
1,A
3,B
4,null
2,C
这是我目前拥有的:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
d = spark.createDataFrame(
[(1,"A"),
(2,None),
(3,"B"),
(4,None),
(4,None),
(2,"C")],
["id", "p"]
)
d1 = d.filter(d.p.isNull())
d2 = d.filter(d.p.isNotNull())
d1 = d1.dropDuplicates()
d2 = d2.dropDuplicates()
d3 = d1.join(d2, "id", 'left_anti')
d4 = d2.unionByName(d3)
有没有更漂亮的方法呢?像这样真的感觉很多余,但我想不出更好的方法。我尝试使用 groupby 但无法实现。有任何想法吗?谢谢。
(df1.sort(col('p1').desc())#sort column descending and will put nulls low in list
.dropDuplicates(subset = ['id']).show()#Drop duplicates on column id
)
+---+----+
| id| p1|
+---+----+
| 1| A|
| 2| C|
| 3| B|
| 4|null|
+---+----+
使用windowrow_number()
函数和排序"p"
列 descending
.
Example:
d.show()
#+---+----+
#| id| p|
#+---+----+
#| 1| A|
#| 2|null|
#| 3| B|
#| 4|null|
#| 4|null|
#| 2| C|
#+---+----+
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
window_spec=row_number().over(Window.partitionBy("id").orderBy(col("p").desc()))
d.withColumn("rn",window_spec).filter(col("rn")==1).drop("rn").show()
#+---+----+
#| id| p|
#+---+----+
#| 1| A|
#| 3| B|
#| 2| C|
#| 4|null|
#+---+----+
我有这样一个数据框:
id,p1
1,A
2,null
3,B
4,null
4,null
2,C
我想使用 PySpark 删除所有重复项。但是,如果存在 p1 列不为空的副本,我想删除空列。例如,我想删除第一次出现的 id 2 和 id 4 中的任何一个。现在我将数据帧拆分为两个数据帧:
id,p1
1,A
3,B
2,C
id,p1
2,null
4,null
4,null
从两者中删除重复项,然后将不在第一个数据框中的重复项添加回来。就这样我得到了这个数据框。
id,p1
1,A
3,B
4,null
2,C
这是我目前拥有的:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
d = spark.createDataFrame(
[(1,"A"),
(2,None),
(3,"B"),
(4,None),
(4,None),
(2,"C")],
["id", "p"]
)
d1 = d.filter(d.p.isNull())
d2 = d.filter(d.p.isNotNull())
d1 = d1.dropDuplicates()
d2 = d2.dropDuplicates()
d3 = d1.join(d2, "id", 'left_anti')
d4 = d2.unionByName(d3)
有没有更漂亮的方法呢?像这样真的感觉很多余,但我想不出更好的方法。我尝试使用 groupby 但无法实现。有任何想法吗?谢谢。
(df1.sort(col('p1').desc())#sort column descending and will put nulls low in list
.dropDuplicates(subset = ['id']).show()#Drop duplicates on column id
)
+---+----+
| id| p1|
+---+----+
| 1| A|
| 2| C|
| 3| B|
| 4|null|
+---+----+
使用windowrow_number()
函数和排序"p"
列 descending
.
Example:
d.show()
#+---+----+
#| id| p|
#+---+----+
#| 1| A|
#| 2|null|
#| 3| B|
#| 4|null|
#| 4|null|
#| 2| C|
#+---+----+
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
window_spec=row_number().over(Window.partitionBy("id").orderBy(col("p").desc()))
d.withColumn("rn",window_spec).filter(col("rn")==1).drop("rn").show()
#+---+----+
#| id| p|
#+---+----+
#| 1| A|
#| 3| B|
#| 2| C|
#| 4|null|
#+---+----+