如何删除基于其他值的冗余值?

How to delete redundant values based on other value?

在下面的数据框中,有几个 apartments 具有不同的 job

+---+---------+------+                                                                 
|id |apartment|job   |
+---+---------+------+
|1  |Ap1      |dev   |
|2  |Ap1      |anyl  |
|3  |Ap2      |dev   |
|4  |Ap2      |anyl  |
|5  |Ap2      |anyl  |
|6  |Ap2      |dev   |
|7  |Ap2      |dev   |
|8  |Ap2      |dev   |
|9  |Ap3      |anyl  |
|10 |Ap3      |dev   |
|11 |Ap3      |dev   |
+---+---------+------+

对于每间公寓,带有 job='dev' 的行数应等于带有 job='anyl' 的行数(如 Ap1)。如何删除所有公寓中带有'dev'的多余行?

预期结果:

+---+---------+------+                                                                 
|id |apartment|job   |
+---+---------+------+
|1  |Ap1      |dev   |
|2  |Ap1      |anyl  |
|3  |Ap2      |dev   |
|4  |Ap2      |anyl  |
|5  |Ap2      |anyl  |
|6  |Ap2      |dev   |
|9  |Ap3      |anyl  |
|10 |Ap3      |dev   |
+---+---------+------+

我想我应该使用 Window 函数来处理它,但我想不通。

我想你首先需要找出每个'apartment'有多少'anyl',然后用它来删除所有多余的'dev'。因此,首先是聚合,然后是 join 然后是 window 函数 row_number,然后才能过滤掉不需要的内容。

设置:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [(1, 'Ap1', 'dev'),
     (2, 'Ap1', 'anyl'),
     (3, 'Ap2', 'dev'),
     (4, 'Ap2', 'anyl'),
     (5, 'Ap2', 'anyl'),
     (6, 'Ap2', 'dev'),
     (7, 'Ap2', 'dev'),
     (8, 'Ap2', 'dev'),
     (9, 'Ap3', 'anyl'),
     (10, 'Ap3', 'dev'),
     (11, 'Ap3', 'dev')],
    ['id', 'apartment', 'job']
)

脚本:

df_grp = df.filter(F.col('job') == 'anyl').groupBy('apartment').count()
df = df.join(df_grp, 'apartment', 'left')

w = W.partitionBy('apartment', 'job').orderBy('id')
df = df.withColumn('_rn', F.row_number().over(w))
df = df.filter('_rn <= count')
df = df.select('id', 'apartment', 'job')

df.show()
# +---+---------+----+
# | id|apartment| job|
# +---+---------+----+
# |  2|      Ap1|anyl|
# |  1|      Ap1| dev|
# |  4|      Ap2|anyl|
# |  5|      Ap2|anyl|
# |  3|      Ap2| dev|
# |  6|      Ap2| dev|
# |  9|      Ap3|anyl|
# | 10|      Ap3| dev|
# +---+---------+----+

使用左半连接而不是@ZygD 建议的 groupBy+filter 组合可能更有效:

>>> from pyspark.sql import Window
>>> from pyspark.sql.functions import *
>>> df1 = df.withColumn('rn', row_number().over(Window.partitionBy('apartment', 'job').orderBy('id')))
>>> df2 = df1.join(df1.alias('dfa').where("job='anyl'"),(df1.apartment==dfa.apartment)&(df1.rn==dfa.rn),'leftsemi')
>>> df2.show(truncate=False)
+---+---------+----+---+
|id |apartment|job |rn |
+---+---------+----+---+
|1  |Ap1      |dev |1  |
|2  |Ap1      |anyl|1  |
|3  |Ap2      |dev |1  |
|4  |Ap2      |anyl|1  |
|5  |Ap2      |anyl|2  |
|6  |Ap2      |dev |2  |
|9  |Ap3      |anyl|1  |
|10 |Ap3      |dev |1  |
+---+---------+----+---+