Pyspark:从列中获取最大值 [性能改进]

Pyspark: Get maximum value from a column [performance improvement]

我有一个 spark 数据框如下:

+-----+------+
|A    | count|
+-----+------+
|dummy|23    |
|ABC  |157   |
|abc  |15    |
+-----+------+

我正在尝试从此列中找出最大值 [上面示例中的 157],这就是我所做的:

max_value = df.agg({"count": "max"}).collect()[0][0]

我是 spark 编程的新手。尽管上面的解决方案有效,对于大数据 [比如几百万行],我不确定这个解决方案的效率如何[因为它涉及到一个减少组件]。是否有更有效的解决方案可用于从列中获取最大值?

PS: 我在互联网上浏览了很多解决方案[例如:并且还没有遇到一个处理性能的。

编辑 1: 我正在处理的数据框有多个大数据列。

编辑 2: 在获取最大值之前正在对数据执行转换:

a) 我从 Google 云平台(在 Parquet 中)获取我的输入数据。

b) 此数据被转换为 pyspark 数据帧。

c) 然后我向该数据框添加一个“计数”列。

d) 然后,我想从“计数”列中获取最大值。

聚合而不排序的动机是什么?如果您只对根据您的示例已经拥有的最大计数感兴趣。

那为什么不 DF.sort(f.col("count").desc()).first().asDict()['count'] 呢?

-- 不是答案--

@BennyElgazar 的快速 POC。
使用具有 4 个内核的单个 VM 集群在 Azure Databricks 上执行。

import pyspark.sql.functions as F

df = spark.range(1000000000).orderBy(F.rand())

df.agg({"id":"max"}).first()[0]

Out[34]: 999999999 Command took 0.64 seconds

df.sort(F.col("id").desc()).first().asDict()['id']

Out[35]: 999999999 Command took 55.38 seconds


P.S.
不需要asDict()

以下任何一项都可以正常工作:

df.sort(F.col("id").desc()).first()[0]
df.sort(F.col("id").desc()).first().id
df.sort(F.col("id").desc()).first()['id']

正如上面@DavidדודוMarkovitz 所建议的,减少执行时间的最佳解决方案是使用 spark 缓存或持久性机制。详细说明如下:

  • 将要缓存的数据帧 and/or 存储到磁盘。
  • 运行 一个简单的数据帧操作 [例如 count()] 在此数据帧上。
  • Post这个,运行实际的[在这种情况下,得到最大值]转换。

中间步骤 [count()] 会花费一些时间,但是 post 这 [由于 spark 的延迟加载特性],数据帧上的所有其他转换将花费最短的时间来完成。