如何 select 具有类别最大值的行?
How to select rows with max values in categories?
我想将每个 ID 键的聚合用于最大(天)的 select 行。
ID
col1
col2
month
Day
AI1
5
2
janv
15
AI2
6
0
Dec
16
AI1
1
7
March
16
AI3
9
4
Nov
18
AI2
3
20
Fev
20
AI3
10
8
June
06
想要的结果:
ID
col1
col2
month
Day
AI1
1
7
March
16
AI2
3
20
Fev
20
AI3
9
4
Nov
18
我想到的唯一解决方案是:
- 获取每个ID最高的一天(使用groupBy)
- 使用 join
将最高日的值附加到每一行(具有匹配的 ID)
- 然后是一个简单的过滤器,其中两行的值匹配
# select the max value for each of the ID
maxDayForIDs = df.groupBy("ID").max("day").withColumnRenamed("max(day)", "maxDay")
# now add the max value of the day for each line (with matching ID)
df = df.join(maxDayForIDs, "ID")
# keep only the lines where it matches "day" equals "maxDay"
df = df.filter(df.day == df.maxDay)
通常这种操作是使用window函数完成的,比如
rank
,
dense_rank
或 row_number
.
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('AI1', 5, 2, 'janv', '15'),
('AI2', 6, 0, 'Dec', '16'),
('AI1', 1, 7, 'March', '16'),
('AI3', 9, 4, 'Nov', '18'),
('AI2', 3, 20, 'Fev', '20'),
('AI3', 10, 8, 'June', '06')],
['ID', 'col1', 'col2', 'month', 'Day']
)
w = W.partitionBy('ID').orderBy(F.desc('Day'))
df = df.withColumn('_rn', F.row_number().over(w))
df = df.filter('_rn=1').drop('_rn')
df.show()
# +---+----+----+-----+---+
# | ID|col1|col2|month|Day|
# +---+----+----+-----+---+
# |AI1| 1| 7|March| 16|
# |AI2| 3| 20| Fev| 20|
# |AI3| 9| 4| Nov| 18|
# +---+----+----+-----+---+
简单点
new= (df.withColumn('max',first('Day').over(w))#Order by day descending and keep first value in a group in max
.where(col('Day')==col('max'))#filter where max=Day
.drop('max')#drop max
).show()
我想将每个 ID 键的聚合用于最大(天)的 select 行。
ID | col1 | col2 | month | Day |
---|---|---|---|---|
AI1 | 5 | 2 | janv | 15 |
AI2 | 6 | 0 | Dec | 16 |
AI1 | 1 | 7 | March | 16 |
AI3 | 9 | 4 | Nov | 18 |
AI2 | 3 | 20 | Fev | 20 |
AI3 | 10 | 8 | June | 06 |
想要的结果:
ID | col1 | col2 | month | Day |
---|---|---|---|---|
AI1 | 1 | 7 | March | 16 |
AI2 | 3 | 20 | Fev | 20 |
AI3 | 9 | 4 | Nov | 18 |
我想到的唯一解决方案是:
- 获取每个ID最高的一天(使用groupBy)
- 使用 join 将最高日的值附加到每一行(具有匹配的 ID)
- 然后是一个简单的过滤器,其中两行的值匹配
# select the max value for each of the ID
maxDayForIDs = df.groupBy("ID").max("day").withColumnRenamed("max(day)", "maxDay")
# now add the max value of the day for each line (with matching ID)
df = df.join(maxDayForIDs, "ID")
# keep only the lines where it matches "day" equals "maxDay"
df = df.filter(df.day == df.maxDay)
通常这种操作是使用window函数完成的,比如
rank
,
dense_rank
或 row_number
.
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('AI1', 5, 2, 'janv', '15'),
('AI2', 6, 0, 'Dec', '16'),
('AI1', 1, 7, 'March', '16'),
('AI3', 9, 4, 'Nov', '18'),
('AI2', 3, 20, 'Fev', '20'),
('AI3', 10, 8, 'June', '06')],
['ID', 'col1', 'col2', 'month', 'Day']
)
w = W.partitionBy('ID').orderBy(F.desc('Day'))
df = df.withColumn('_rn', F.row_number().over(w))
df = df.filter('_rn=1').drop('_rn')
df.show()
# +---+----+----+-----+---+
# | ID|col1|col2|month|Day|
# +---+----+----+-----+---+
# |AI1| 1| 7|March| 16|
# |AI2| 3| 20| Fev| 20|
# |AI3| 9| 4| Nov| 18|
# +---+----+----+-----+---+
简单点
new= (df.withColumn('max',first('Day').over(w))#Order by day descending and keep first value in a group in max
.where(col('Day')==col('max'))#filter where max=Day
.drop('max')#drop max
).show()