当 python 函数比它们快时,为什么我们使用 pyspark UDF? (注意。不用担心 spark SQL 命令)
Why do we use pyspark UDF when python functions are faster than them? (Note. Not worrying about spark SQL commands)
我有一个数据框:
df = (spark
.range(0, 10 * 1000 * 1000)\
.withColumn('id', (col('id') / 1000).cast('integer'))\
.withColumn('v', rand()))
输出:
+---+-------------------+
| id| v|
+---+-------------------+
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
+---+-------------------+
现在,一个简单的 - 将 'v' 加 1 可以通过 SQL 函数和 UDF 完成。
如果我们忽略 SQL(最佳性能)
我们可以创建一个 UDF 为:
@udf("double")
def plus_one(v):
return v + 1
并称它为:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
时间:16.5秒
但这是我的问题:
如果我不要使用udf并直接写:
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
耗时 - 352 毫秒
简而言之,UDF 查询耗时约 16 秒,而正常的 python 函数耗时约 350 毫秒
比较,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
时间:347 毫秒
这是我的困境:
如果我可以用一个正常的 python 函数执行相同的场景,该函数的性能与内置函数相比...
问。为什么我们不直接使用 python 函数?
问。如果我们打算像命令一样在 SQL 中使用它,注册 UDF 是否重要?
一定有一些我们不这样做的优化原因......或者可能与 spark 集群的工作方式有关?
[ 已经回答了 2 个问题,但是这两个问题都以“SQL 内置函数是首选...”结尾
我正在将 python 函数与 UDF 进行比较,它在 pyspark 应用程序中的可行性。 ]
编辑:
我也用 pandas_udf 做了这个:
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
时间:5.26 秒
我附上了截图:
The output for Adding 1 to value - Python funtion (standalone), UDF, SQL
您的方案之所以有效,是因为实际上您没有在 python 中添加 1,它在 Java 中的添加方式与您在 SQL 中添加的方式非常相似。
让我们分开来看:
- 你做
plus_one(df.v)
等于刚刚通过 df.v + 1
- 尝试在您最喜欢的 repl 中输入
df.v + 1
,您会看到它 returns 类型为 Column
的对象。
- 怎么可能?
Column
class 覆盖了 __radd__
魔术方法(以及其他一些方法)和 returns 新的 Column
实例,其中包含将 1 添加到指定列的指令。
总而言之:withColumn
始终接受类型为 Column
的对象作为第二个参数,并且向您的列添加 1 的技巧是 python.
的魔力
这就是它比 udf
和 vectorized udf
工作得更快的原因:他们需要 运行 python 处理,serialize/deserialize 数据(向量化的 udfs 可以工作得更快使用 arrow
来避免 serializing/deserializing),在较慢的 python 过程中计算。
我有一个数据框:
df = (spark
.range(0, 10 * 1000 * 1000)\
.withColumn('id', (col('id') / 1000).cast('integer'))\
.withColumn('v', rand()))
输出:
+---+-------------------+
| id| v|
+---+-------------------+
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
+---+-------------------+
现在,一个简单的 - 将 'v' 加 1 可以通过 SQL 函数和 UDF 完成。
如果我们忽略 SQL(最佳性能)
我们可以创建一个 UDF 为:
@udf("double")
def plus_one(v):
return v + 1
并称它为:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
时间:16.5秒
但这是我的问题:
如果我不要使用udf并直接写:
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
耗时 - 352 毫秒
简而言之,UDF 查询耗时约 16 秒,而正常的 python 函数耗时约 350 毫秒
比较,
df.selectExpr("id", "v+1 as v").agg(count(col('v'))).show()
时间:347 毫秒
这是我的困境:
如果我可以用一个正常的 python 函数执行相同的场景,该函数的性能与内置函数相比...
问。为什么我们不直接使用 python 函数?
问。如果我们打算像命令一样在 SQL 中使用它,注册 UDF 是否重要?
一定有一些我们不这样做的优化原因......或者可能与 spark 集群的工作方式有关?
[ 已经回答了 2 个问题,但是这两个问题都以“SQL 内置函数是首选...”结尾 我正在将 python 函数与 UDF 进行比较,它在 pyspark 应用程序中的可行性。 ]
编辑: 我也用 pandas_udf 做了这个:
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
时间:5.26 秒
我附上了截图:
The output for Adding 1 to value - Python funtion (standalone), UDF, SQL
您的方案之所以有效,是因为实际上您没有在 python 中添加 1,它在 Java 中的添加方式与您在 SQL 中添加的方式非常相似。
让我们分开来看:
- 你做
plus_one(df.v)
等于刚刚通过df.v + 1
- 尝试在您最喜欢的 repl 中输入
df.v + 1
,您会看到它 returns 类型为Column
的对象。 - 怎么可能?
Column
class 覆盖了__radd__
魔术方法(以及其他一些方法)和 returns 新的Column
实例,其中包含将 1 添加到指定列的指令。
总而言之:withColumn
始终接受类型为 Column
的对象作为第二个参数,并且向您的列添加 1 的技巧是 python.
这就是它比 udf
和 vectorized udf
工作得更快的原因:他们需要 运行 python 处理,serialize/deserialize 数据(向量化的 udfs 可以工作得更快使用 arrow
来避免 serializing/deserializing),在较慢的 python 过程中计算。