Pyspark:从列中获取最大值 [性能改进]
Pyspark: Get maximum value from a column [performance improvement]
我有一个 spark 数据框如下:
+-----+------+
|A | count|
+-----+------+
|dummy|23 |
|ABC |157 |
|abc |15 |
+-----+------+
我正在尝试从此列中找出最大值 [上面示例中的 157],这就是我所做的:
max_value = df.agg({"count": "max"}).collect()[0][0]
我是 spark 编程的新手。尽管上面的解决方案有效,对于大数据 [比如几百万行],我不确定这个解决方案的效率如何[因为它涉及到一个减少组件]。是否有更有效的解决方案可用于从列中获取最大值?
PS: 我在互联网上浏览了很多解决方案[例如:并且还没有遇到一个处理性能的。
编辑 1: 我正在处理的数据框有多个大数据列。
编辑 2: 在获取最大值之前正在对数据执行转换:
a) 我从 Google 云平台(在 Parquet 中)获取我的输入数据。
b) 此数据被转换为 pyspark 数据帧。
c) 然后我向该数据框添加一个“计数”列。
d) 然后,我想从“计数”列中获取最大值。
聚合而不排序的动机是什么?如果您只对根据您的示例已经拥有的最大计数感兴趣。
那为什么不 DF.sort(f.col("count").desc()).first().asDict()['count']
呢?
-- 不是答案--
@BennyElgazar 的快速 POC。
使用具有 4 个内核的单个 VM 集群在 Azure Databricks 上执行。
import pyspark.sql.functions as F
df = spark.range(1000000000).orderBy(F.rand())
df.agg({"id":"max"}).first()[0]
Out[34]: 999999999 Command took 0.64 seconds
df.sort(F.col("id").desc()).first().asDict()['id']
Out[35]: 999999999 Command took 55.38 seconds
P.S.
不需要asDict()
以下任何一项都可以正常工作:
df.sort(F.col("id").desc()).first()[0]
df.sort(F.col("id").desc()).first().id
df.sort(F.col("id").desc()).first()['id']
正如上面@DavidדודוMarkovitz 所建议的,减少执行时间的最佳解决方案是使用 spark 缓存或持久性机制。详细说明如下:
- 将要缓存的数据帧 and/or 存储到磁盘。
- 运行 一个简单的数据帧操作 [例如 count()] 在此数据帧上。
- Post这个,运行实际的[在这种情况下,得到最大值]转换。
中间步骤 [count()] 会花费一些时间,但是 post 这 [由于 spark 的延迟加载特性],数据帧上的所有其他转换将花费最短的时间来完成。
我有一个 spark 数据框如下:
+-----+------+
|A | count|
+-----+------+
|dummy|23 |
|ABC |157 |
|abc |15 |
+-----+------+
我正在尝试从此列中找出最大值 [上面示例中的 157],这就是我所做的:
max_value = df.agg({"count": "max"}).collect()[0][0]
我是 spark 编程的新手。尽管上面的解决方案有效,对于大数据 [比如几百万行],我不确定这个解决方案的效率如何[因为它涉及到一个减少组件]。是否有更有效的解决方案可用于从列中获取最大值?
PS: 我在互联网上浏览了很多解决方案[例如:
编辑 1: 我正在处理的数据框有多个大数据列。
编辑 2: 在获取最大值之前正在对数据执行转换:
a) 我从 Google 云平台(在 Parquet 中)获取我的输入数据。
b) 此数据被转换为 pyspark 数据帧。
c) 然后我向该数据框添加一个“计数”列。
d) 然后,我想从“计数”列中获取最大值。
聚合而不排序的动机是什么?如果您只对根据您的示例已经拥有的最大计数感兴趣。
那为什么不 DF.sort(f.col("count").desc()).first().asDict()['count']
呢?
-- 不是答案--
@BennyElgazar 的快速 POC。
使用具有 4 个内核的单个 VM 集群在 Azure Databricks 上执行。
import pyspark.sql.functions as F
df = spark.range(1000000000).orderBy(F.rand())
df.agg({"id":"max"}).first()[0]
Out[34]: 999999999 Command took 0.64 seconds
df.sort(F.col("id").desc()).first().asDict()['id']
Out[35]: 999999999 Command took 55.38 seconds
P.S.
不需要asDict()
以下任何一项都可以正常工作:
df.sort(F.col("id").desc()).first()[0]
df.sort(F.col("id").desc()).first().id
df.sort(F.col("id").desc()).first()['id']
正如上面@DavidדודוMarkovitz 所建议的,减少执行时间的最佳解决方案是使用 spark 缓存或持久性机制。详细说明如下:
- 将要缓存的数据帧 and/or 存储到磁盘。
- 运行 一个简单的数据帧操作 [例如 count()] 在此数据帧上。
- Post这个,运行实际的[在这种情况下,得到最大值]转换。
中间步骤 [count()] 会花费一些时间,但是 post 这 [由于 spark 的延迟加载特性],数据帧上的所有其他转换将花费最短的时间来完成。