我想根据其他两列获得一列的最大值,第四列是重复次数最多的值
i want to obtain max value of a column depending on two other columns and for the forth column the value of the most repeated number
我有这个数据框
df1 = spark.createDataFrame([
('c', 'd', 3.0, 4),
('c', 'd', 7.3, 8),
('c', 'd', 7.3, 2),
('c', 'd', 7.3, 8),
('e', 'f', 6.0, 3),
('e', 'f', 6.0, 8),
('e', 'f', 6.0, 3),
('c', 'j', 4.2, 3),
('c', 'j', 4.3, 9),
], ['a', 'b', 'c', 'd'])
df1.show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|3.0| 4|
| c| d|7.3| 8|
| c| d|7.3| 2|
| c| d|7.3| 8|
| e| f|6.0| 3|
| e| f|6.0| 8|
| e| f|6.0| 3|
| c| j|4.2| 3|
| c| j|4.3| 9|
+---+---+---+---+
我这样做是为了得到一对 a 和 b 的 c 的最大值
df2 = df1.groupBy('a', 'b').agg(F.max('c').alias('c_max')).select(
F.col('a'),
F.col('b'),
F.col('c_max').alias('c')
)
df2.show()
+---+---+---+
| a| b| c|
+---+---+---+
| e| f|6.0|
| c| d|7.3|
| c| j|4.3|
+---+---+---+
但现在我需要得到 d 的值应该是
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|7.3| 8|
| e| f|6.0| 3|
| c| j|4.3| 9|
+---+---+---+---+
我尝试在 df1 和 df2 之间进行内部连接,但没有成功:
condition = [df1.a == df2.a, df1.b == df2.b, df1.c == df2.c]
df3 = df1.join(df2,condition,"inner")
df3.show()
+---+---+---+---+---+---+---+
| a| b| c| d| a| b| c|
+---+---+---+---+---+---+---+
| c| d|7.3| 8| c| d|7.3|
| c| d|7.3| 8| c| d|7.3|
| c| d|7.3| 2| c| d|7.3|
| e| f|6.0| 3| e| f|6.0|
| e| f|6.0| 8| e| f|6.0|
| e| f|6.0| 3| e| f|6.0|
| c| j|4.3| 9| c| j|4.3|
+---+---+---+---+---+---+---+
我是 pyspark 的初学者,所以我需要一点帮助来解决这个问题
您可以“压缩”d
并计数 d
并照常聚合以保持频率
df3 = (df1
.groupBy('a', 'b', 'd')
.agg(F.count('*').alias('d_count'))
.groupBy('a', 'b')
.agg(F.max(F.array('d_count', 'd')).alias('d_freq'))
.select('a', 'b', F.col('d_freq')[1].alias('d'))
)
+---+---+---+
| a| b| d|
+---+---+---+
| c| d| 8|
| c| j| 9|
| e| f| 3|
+---+---+---+
现在加入您的 df2
,这个新的 df3
将提供您想要的输出。
df2.join(df3, on=['a', 'b']).show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|7.3| 8|
| c| j|4.3| 9|
| e| f|6.0| 3|
+---+---+---+---+
你可以先统计出现频率,然后按照降序排列来分配顺序值。然后,获取阶数为1的第一个值。
这不处理平局,如果最高频率有平局,这将选择任何(non-deterministic)。
from pyspark.sql import functions as F
df1 = (df1.withColumn('d_count', F.count('*').over(Window.partitionBy(['a', 'b', 'd'])))
.withColumn('d_order', F.row_number().over(Window.partitionBy(['a', 'b']).orderBy(F.desc('d_count'))))
.groupby(['a', 'b'])
.agg(
F.max('c').alias('c'),
F.first(F.when(F.col('d_order') == 1, F.col('d'))).alias('d'))
)
# df1.show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| e| f|6.0| 3|
| c| d|7.3| 8|
| c| j|4.3| 9|
+---+---+---+---+
我有这个数据框
df1 = spark.createDataFrame([
('c', 'd', 3.0, 4),
('c', 'd', 7.3, 8),
('c', 'd', 7.3, 2),
('c', 'd', 7.3, 8),
('e', 'f', 6.0, 3),
('e', 'f', 6.0, 8),
('e', 'f', 6.0, 3),
('c', 'j', 4.2, 3),
('c', 'j', 4.3, 9),
], ['a', 'b', 'c', 'd'])
df1.show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|3.0| 4|
| c| d|7.3| 8|
| c| d|7.3| 2|
| c| d|7.3| 8|
| e| f|6.0| 3|
| e| f|6.0| 8|
| e| f|6.0| 3|
| c| j|4.2| 3|
| c| j|4.3| 9|
+---+---+---+---+
我这样做是为了得到一对 a 和 b 的 c 的最大值
df2 = df1.groupBy('a', 'b').agg(F.max('c').alias('c_max')).select(
F.col('a'),
F.col('b'),
F.col('c_max').alias('c')
)
df2.show()
+---+---+---+
| a| b| c|
+---+---+---+
| e| f|6.0|
| c| d|7.3|
| c| j|4.3|
+---+---+---+
但现在我需要得到 d 的值应该是
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|7.3| 8|
| e| f|6.0| 3|
| c| j|4.3| 9|
+---+---+---+---+
我尝试在 df1 和 df2 之间进行内部连接,但没有成功:
condition = [df1.a == df2.a, df1.b == df2.b, df1.c == df2.c]
df3 = df1.join(df2,condition,"inner")
df3.show()
+---+---+---+---+---+---+---+
| a| b| c| d| a| b| c|
+---+---+---+---+---+---+---+
| c| d|7.3| 8| c| d|7.3|
| c| d|7.3| 8| c| d|7.3|
| c| d|7.3| 2| c| d|7.3|
| e| f|6.0| 3| e| f|6.0|
| e| f|6.0| 8| e| f|6.0|
| e| f|6.0| 3| e| f|6.0|
| c| j|4.3| 9| c| j|4.3|
+---+---+---+---+---+---+---+
我是 pyspark 的初学者,所以我需要一点帮助来解决这个问题
您可以“压缩”d
并计数 d
并照常聚合以保持频率
df3 = (df1
.groupBy('a', 'b', 'd')
.agg(F.count('*').alias('d_count'))
.groupBy('a', 'b')
.agg(F.max(F.array('d_count', 'd')).alias('d_freq'))
.select('a', 'b', F.col('d_freq')[1].alias('d'))
)
+---+---+---+
| a| b| d|
+---+---+---+
| c| d| 8|
| c| j| 9|
| e| f| 3|
+---+---+---+
现在加入您的 df2
,这个新的 df3
将提供您想要的输出。
df2.join(df3, on=['a', 'b']).show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| d|7.3| 8|
| c| j|4.3| 9|
| e| f|6.0| 3|
+---+---+---+---+
你可以先统计出现频率,然后按照降序排列来分配顺序值。然后,获取阶数为1的第一个值。
这不处理平局,如果最高频率有平局,这将选择任何(non-deterministic)。
from pyspark.sql import functions as F
df1 = (df1.withColumn('d_count', F.count('*').over(Window.partitionBy(['a', 'b', 'd'])))
.withColumn('d_order', F.row_number().over(Window.partitionBy(['a', 'b']).orderBy(F.desc('d_count'))))
.groupby(['a', 'b'])
.agg(
F.max('c').alias('c'),
F.first(F.when(F.col('d_order') == 1, F.col('d'))).alias('d'))
)
# df1.show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| e| f|6.0| 3|
| c| d|7.3| 8|
| c| j|4.3| 9|
+---+---+---+---+