Pandas udf 循环遍历 PySpark 数据帧行

Pandas udf loop over PySpark dataframe rows

我正在尝试使用 pandas_udf,因为我的数据在 PySpark 数据框中,但我想使用 pandas 库。我有很多行,所以我无法将我的 PySpark 数据帧转换为 Pandas 数据帧。

我使用 textdistance (pip3 install textdistance) 并导入它:import textdistance.

test = spark.createDataFrame(
    [('dog cat', 'dog cat'), 
     ('cup dad', 'mug'),],
    ['value1', 'value2']
)

@pandas_udf('float', PandasUDFType.SCALAR)
def textdistance_jaro_winkler(a, b):
    return textdistance.jaro_winkler(a, b)

test = test.withColumn('jaro_winkler', textdistance_jaro_winkler(col('value1'), col('value2')))
test.show()

我收到以下错误:

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

我试图将整个数据帧作为函数中的参数传递并在函数中传递字符串值,但我认为这使情况变得更糟:

schema = StructType([StructField("value1", StringType(), True)
                     ,StructField("value2", StringType(), True)
                     ,StructField("jaro_winkler", FloatType(), True)
                    ])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def textdistance_jaro_winkler(df):
    df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'],  x['value2']))
    
    return df

普通的 Python UDF 可以完成这项工作:

import pyspark.sql.functions as F
import textdistance

test2 = test.withColumn(
    'jaro_winkler',
    F.udf(textdistance.jaro_winkler)('value1', 'value2').cast('float')
)

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+

但使用 pandas UDF 并非不可能...

import textdistance

def textdistance_jaro_winkler(iterator):
    for df in iterator:
        df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'], x['value2']), axis=1)
        yield df

test2 = test.mapInPandas(textdistance_jaro_winkler, 'value1 string, value2 string, jaro_winkler float')

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+

您需要重写函数才能使用 pandas UDF Series to Series :

import pandas as pd
import textdistance
from pyspark.sql import functions as F

def textdistance_jaro_winkler(a: pd.Series, b: pd.Series) -> pd.Series:
    return pd.Series([textdistance.jaro_winkler(x, y) for x, y in zip(a, b)])


jaro_winkler_udf = F.pandas_udf(textdistance_jaro_winkler, returnType=FloatType())

test = test.withColumn('jaro_winkler', jaro_winkler_udf(col('value1'), col('value2')))
test.show()

#+-------+-------+------------+
#| value1| value2|jaro_winkler|
#+-------+-------+------------+
#|dog cat|dog cat|         1.0|
#|cup dad|    mug|   0.4920635|
#+-------+-------+------------+