使用 dict 查找在 pyspark 中矢量化 pandas udf
Vectorized pandas udf in pyspark with dict lookup
我正在尝试学习在 pyspark (Databricks) 中使用 pandas_udf
。
其中一项作业是编写 pandas_udf
以按星期几排序。我知道如何使用 spark udf 来做到这一点:
from pyspark.sql.functions import *
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()
@udf()
def udf(day: str) -> str:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day] + '-' + day
print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()
打印:
Original
+---+-----------+
|day| avg_users|
+---+-----------+
|Sun| 282905.5|
|Mon| 238195.5|
|Thu| 264620.0|
|Sat| 278482.0|
|Wed| 227214.0|
+---+-----------+
with spark udf
+-----------+-----+
| avg_users| day|
+-----------+-----+
| 238195.5|1-Mon|
| 227214.0|3-Wed|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 282905.5|7-Sun|
+-----------+-----+
尝试对 pandas_udf
做同样的事情
import pandas as pd
@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day.str] + '-' + day.str
p_final_df = df.select(df.avg_users, p_udf(df.day))
print('with pandas udf')
p_final_df.show()
我得到 KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>
。我认为它来自 dow[day.str]
,这有点道理。
我也试过:
return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}" # KeyError: .... StringMethods (but I think this is logically
# wrong, returning a string instead of a Series)
我读过:
- API reference
- How to convert Scalar Pyspark UDF to Pandas UDF?
在你执行 udf 之后,我们 return 使用 groupeddata 和 orderby 的数据框呢? Pandas sort_values
在 udfs 中有很多问题。
基本上,在 udf 中,我使用 python 生成数字,然后将它们连接回日期列。
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
day=pdf.day
pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)
return pdf
df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()
+-----+---------+
| day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+
单独使用 .str
方法而不进行任何实际的矢量化转换会给您带来错误。此外,您不能将整个系列用作 dow
字典的键。对 pandas.Series
使用 map
方法:
from pyspark.sql.functions import *
import pandas as pd
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return day.map(dow) + '-' + day
df.select(df.avg_users, p_udf(df.day).alias("day")).show()
+---------+-----+
|avg_users| day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+
我正在尝试学习在 pyspark (Databricks) 中使用 pandas_udf
。
其中一项作业是编写 pandas_udf
以按星期几排序。我知道如何使用 spark udf 来做到这一点:
from pyspark.sql.functions import *
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
print('Original')
df.show()
@udf()
def udf(day: str) -> str:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day] + '-' + day
print('with spark udf')
final_df = df.select(col('avg_users'), udf(col('day')).alias('day')).sort('day')
final_df.show()
打印:
Original
+---+-----------+
|day| avg_users|
+---+-----------+
|Sun| 282905.5|
|Mon| 238195.5|
|Thu| 264620.0|
|Sat| 278482.0|
|Wed| 227214.0|
+---+-----------+
with spark udf
+-----------+-----+
| avg_users| day|
+-----------+-----+
| 238195.5|1-Mon|
| 227214.0|3-Wed|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 282905.5|7-Sun|
+-----------+-----+
尝试对 pandas_udf
import pandas as pd
@pandas_udf('string')
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return dow[day.str] + '-' + day.str
p_final_df = df.select(df.avg_users, p_udf(df.day))
print('with pandas udf')
p_final_df.show()
我得到 KeyError: <pandas.core.strings.accessor.StringMethods object at 0x7f31197cd9a0>
。我认为它来自 dow[day.str]
,这有点道理。
我也试过:
return dow[day.str.__str__()] + '-' + day.str # KeyError: .... StringMethods
return dow[str(day.str)] + '-' + day.str # KeyError: .... StringMethods
return dow[day.str.upper()] + '-' + day.str # TypeError: unhashable type: 'Series'
return f"{dow[day.str]}-{day.str}" # KeyError: .... StringMethods (but I think this is logically
# wrong, returning a string instead of a Series)
我读过:
- API reference
- How to convert Scalar Pyspark UDF to Pandas UDF?
在你执行 udf 之后,我们 return 使用 groupeddata 和 orderby 的数据框呢? Pandas sort_values
在 udfs 中有很多问题。
基本上,在 udf 中,我使用 python 生成数字,然后将它们连接回日期列。
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
import calendar
def sortdf(pdf):
day=pdf.day
pdf =pdf.assign(day=(day.map(dict(zip(calendar.day_abbr, range(7))))+1).astype(str) + '-'+day)
return pdf
df.groupby('avg_users').applyInPandas(sortdf, schema=df.schema).show()
+-----+---------+
| day|avg_users|
+-----+---------+
|3-Wed| 227214.0|
|1-Mon| 238195.5|
|4-Thu| 264620.0|
|6-Sat| 278482.0|
|7-Sun| 282905.5|
+-----+---------+
单独使用 .str
方法而不进行任何实际的矢量化转换会给您带来错误。此外,您不能将整个系列用作 dow
字典的键。对 pandas.Series
使用 map
方法:
from pyspark.sql.functions import *
import pandas as pd
data = [('Sun', 282905.5), ('Mon', 238195.5), ('Thu', 264620.0), ('Sat', 278482.0), ('Wed', 227214.0)]
schema = 'day string, avg_users double'
df = spark.createDataFrame(data, schema)
@pandas_udf("string")
def p_udf(day: pd.Series) -> pd.Series:
dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
"Fri": "5", "Sat": "6", "Sun": "7"}
return day.map(dow) + '-' + day
df.select(df.avg_users, p_udf(df.day).alias("day")).show()
+---------+-----+
|avg_users| day|
+---------+-----+
| 282905.5|7-Sun|
| 238195.5|1-Mon|
| 264620.0|4-Thu|
| 278482.0|6-Sat|
| 227214.0|3-Wed|
+---------+-----+