我想根据其他两列获得一列的最大值,第四列是重复次数最多的值

i want to obtain max value of a column depending on two other columns and for the forth column the value of the most repeated number

我有这个数据框

df1 = spark.createDataFrame([
    ('c', 'd', 3.0, 4),
    ('c', 'd', 7.3, 8),
    ('c', 'd', 7.3, 2),
    ('c', 'd', 7.3, 8),
    ('e', 'f', 6.0, 3),
    ('e', 'f', 6.0, 8),
    ('e', 'f', 6.0, 3),
    ('c', 'j', 4.2, 3),
    ('c', 'j', 4.3, 9),
], ['a', 'b', 'c', 'd'])
df1.show()
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|3.0|  4|
|  c|  d|7.3|  8|
|  c|  d|7.3|  2|
|  c|  d|7.3|  8|
|  e|  f|6.0|  3|
|  e|  f|6.0|  8|
|  e|  f|6.0|  3|
|  c|  j|4.2|  3|
|  c|  j|4.3|  9|
+---+---+---+---+

我这样做是为了得到一对 a 和 b 的 c 的最大值

df2 = df1.groupBy('a', 'b').agg(F.max('c').alias('c_max')).select(
        F.col('a'),
        F.col('b'),
        F.col('c_max').alias('c')
    )
df2.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  e|  f|6.0|
|  c|  d|7.3|
|  c|  j|4.3|
+---+---+---+

但现在我需要得到 d 的值应该是

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|7.3|  8|
|  e|  f|6.0|  3|
|  c|  j|4.3|  9|
+---+---+---+---+

我尝试在 df1 和 df2 之间进行内部连接,但没有成功:

condition = [df1.a ==  df2.a, df1.b ==  df2.b, df1.c ==  df2.c]
df3 = df1.join(df2,condition,"inner")
df3.show()
+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  a|  b|  c|
+---+---+---+---+---+---+---+
|  c|  d|7.3|  8|  c|  d|7.3|
|  c|  d|7.3|  8|  c|  d|7.3|
|  c|  d|7.3|  2|  c|  d|7.3|
|  e|  f|6.0|  3|  e|  f|6.0|
|  e|  f|6.0|  8|  e|  f|6.0|
|  e|  f|6.0|  3|  e|  f|6.0|
|  c|  j|4.3|  9|  c|  j|4.3|
+---+---+---+---+---+---+---+

我是 pyspark 的初学者,所以我需要一点帮助来解决这个问题

您可以“压缩”d 并计数 d 并照常聚合以保持频率

df3 = (df1
    .groupBy('a', 'b', 'd')
    .agg(F.count('*').alias('d_count'))
    .groupBy('a', 'b')
    .agg(F.max(F.array('d_count', 'd')).alias('d_freq'))
    .select('a', 'b', F.col('d_freq')[1].alias('d'))
)

+---+---+---+
|  a|  b|  d|
+---+---+---+
|  c|  d|  8|
|  c|  j|  9|
|  e|  f|  3|
+---+---+---+

现在加入您的 df2,这个新的 df3 将提供您想要的输出。

df2.join(df3, on=['a', 'b']).show()
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  c|  d|7.3|  8|
|  c|  j|4.3|  9|
|  e|  f|6.0|  3|
+---+---+---+---+

你可以先统计出现频率,然后按照降序排列来分配顺序值。然后,获取阶数为1的第一个值。

这不处理平局,如果最高频率有平局,这将选择任何(non-deterministic)。

from pyspark.sql import functions as F

df1 = (df1.withColumn('d_count', F.count('*').over(Window.partitionBy(['a', 'b', 'd'])))
 .withColumn('d_order', F.row_number().over(Window.partitionBy(['a', 'b']).orderBy(F.desc('d_count'))))
 .groupby(['a', 'b'])
 .agg(
   F.max('c').alias('c'),
   F.first(F.when(F.col('d_order') == 1, F.col('d'))).alias('d'))
)
# df1.show()

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  e|  f|6.0|  3|
|  c|  d|7.3|  8|
|  c|  j|4.3|  9|
+---+---+---+---+