在 pyspark UDF 中使用 class 方法
Using a class method inside of a pyspark UDF
您好数据工程师!
我正在尝试使用 class 中名为 Astral
的方法编写 pyspark udf
这是 udf :
def time_from_solar_noon(d, y):
noon = astral.Astral().solar_noon_utc
time = noon(d, y)
return time
solarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType())
现在我的理解是,class 将为我的数据框中的 每一行 实例化,导致工作非常缓慢。
如果我从我的函数中取出 class 实例化:
noon = astral.Astral().solar_noon_utc
def time_from_solar_noon(d, y):
time = noon(d, y)
return time
我收到以下错误消息:
[Previous line repeated 326 more times]
RecursionError: maximum recursion depth exceeded while calling a Python object
所以这是我的问题,我认为应该可以通过 executor/thread 至少有一个 class 实例化,而不是在我的数据框中逐行实例化,我该怎么做?
感谢您的帮助
就像数据库连接一样,您可以使用 mapPartitions
:
仅实例化有限数量的 class 个实例
In [1]: from datetime import date
...: from astral import Astral
...:
...: df = spark.createDataFrame(
...: ((date(2019, 10, 4), 0),
...: (date(2019, 10, 4), 19)),
...: schema=("date", "longitude"))
...:
...:
...: def solar_noon(rows):
...: a = Astral() # initialize the class once per partition
...: return ((a.solar_noon_utc(date=r.date, longitude=r.longitude), *r)
...: for r in rows) # reuses the same Astral instance for all rows in this partition
...:
...:
...: (df.rdd
...: .mapPartitions(solar_noon)
...: .toDF(schema=("solar_noon_utc", *df.columns))
...: .show()
...: )
...:
...:
+-------------------+----------+---------+
| solar_noon_utc| date|longitude|
+-------------------+----------+---------+
|2019-10-04 13:48:58|2019-10-04| 0|
|2019-10-04 12:32:58|2019-10-04| 19|
+-------------------+----------+---------+
这是相当有效的,因为函数 (solar_noon
) 已分配给每个工作人员,并且 class 每个分区仅初始化一次,可以容纳很多行。
您好数据工程师!
我正在尝试使用 class 中名为 Astral
的方法编写 pyspark udf这是 udf :
def time_from_solar_noon(d, y):
noon = astral.Astral().solar_noon_utc
time = noon(d, y)
return time
solarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType())
现在我的理解是,class 将为我的数据框中的 每一行 实例化,导致工作非常缓慢。
如果我从我的函数中取出 class 实例化:
noon = astral.Astral().solar_noon_utc
def time_from_solar_noon(d, y):
time = noon(d, y)
return time
我收到以下错误消息:
[Previous line repeated 326 more times]
RecursionError: maximum recursion depth exceeded while calling a Python object
所以这是我的问题,我认为应该可以通过 executor/thread 至少有一个 class 实例化,而不是在我的数据框中逐行实例化,我该怎么做?
感谢您的帮助
就像数据库连接一样,您可以使用 mapPartitions
:
In [1]: from datetime import date
...: from astral import Astral
...:
...: df = spark.createDataFrame(
...: ((date(2019, 10, 4), 0),
...: (date(2019, 10, 4), 19)),
...: schema=("date", "longitude"))
...:
...:
...: def solar_noon(rows):
...: a = Astral() # initialize the class once per partition
...: return ((a.solar_noon_utc(date=r.date, longitude=r.longitude), *r)
...: for r in rows) # reuses the same Astral instance for all rows in this partition
...:
...:
...: (df.rdd
...: .mapPartitions(solar_noon)
...: .toDF(schema=("solar_noon_utc", *df.columns))
...: .show()
...: )
...:
...:
+-------------------+----------+---------+
| solar_noon_utc| date|longitude|
+-------------------+----------+---------+
|2019-10-04 13:48:58|2019-10-04| 0|
|2019-10-04 12:32:58|2019-10-04| 19|
+-------------------+----------+---------+
这是相当有效的,因为函数 (solar_noon
) 已分配给每个工作人员,并且 class 每个分区仅初始化一次,可以容纳很多行。