使用 dict 查找在 pyspark 中矢量化 pandas udf

Vectorized pandas udf in pyspark with dict lookup

我正在尝试学习在 pyspark (Databricks) 中使用 pandas_udf

其中一项作业是编写 pandas_udf 以按星期几排序。我知道如何使用 spark udf 来做到这一点:

from pyspark.sql.functions import *

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()


@udf()
def udf(day: str) -> str:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day] + '-' + day

print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()

打印:

Original
+---+-----------+
|day|  avg_users|
+---+-----------+
|Sun|   282905.5|
|Mon|   238195.5|
|Thu|   264620.0|
|Sat|   278482.0|
|Wed|   227214.0|
+---+-----------+

with spark udf
+-----------+-----+
|  avg_users|  day|
+-----------+-----+
|   238195.5|1-Mon|
|   227214.0|3-Wed|
|   264620.0|4-Thu|
|   278482.0|6-Sat|
|   282905.5|7-Sun|
+-----------+-----+

尝试对 pandas_udf

做同样的事情
import pandas as pd


@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow[day.str] + '-' + day.str


p_final_df = df.select(df.avg_users, p_udf(df.day))

print('with pandas udf')
p_final_df.show()

我得到 KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>。我认为它来自 dow[day.str],这有点道理。

我也试过:

return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str      # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str   # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}"            # KeyError: .... StringMethods (but I think this is logically
                                              # wrong, returning a string instead of a Series)

我读过:

在你执行 udf 之后,我们 return 使用 groupeddata 和 orderby 的数据框呢? Pandas sort_values 在 udfs 中有很多问题。

基本上,在 udf 中,我使用 python 生成数字,然后将它们连接回日期列。

from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
  day=pdf.day
  pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)
 
  return pdf

df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()

+-----+---------+
|  day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+

单独使用 .str 方法而不进行任何实际的矢量化转换会给您带来错误。此外,您不能将整个系列用作 dow 字典的键。对 pandas.Series 使用 map 方法:

from pyspark.sql.functions import *
import pandas as pd

data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)

@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return day.map(dow) + '-' + day

df.select(df.avg_users, p_udf(df.day).alias("day")).show()

+---------+-----+
|avg_users|  day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+