Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding working fine outside of UDF)

Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding working fine outside of UDF)

我知道有类似的话题,但我无法用这些解决方案解决我的错误。

这是我的架构:

root
 |-- embeddings: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- id: long (nullable = true)

我正在尝试按 id 分组并在 id

中的所有 embeddings 中执行成对余弦相似度

这是我的代码首尾相连:

import pyspark.sql.types as T
import pyspark.sql.functions as F

embeddings=spark.read.parquet('directory')

schema = T.StructType([T.StructField("shop_id", T.LongType(), True),
                       T.StructField("cosine_similarities", T.ArrayType(T.ArrayType(T.DoubleType(), True), True))
                      ])


@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(df):
    
    single_col = df.select(F.col('embeddings'))

    single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
    
    cosine_sim = cosine_similarity(single_col_flatmap)

    return cosine_sim

embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(1)

这会引发以下错误:

AttributeError: 'NoneType' object has no attribute '_jvm'

现在,为了调试它,我 运行 函数中的代码只 id 而没有 运行 出现问题。

single_col = embeddings.filter("id =1").select(F.col('embeddings'))

single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()

cosine_sim = cosine_similarity(single_col_flatmap)

如有任何帮助,我们将不胜感激。

您收到该错误是因为您试图在 pandas_udf 中使用 Pyspark 数据框。但是,传递给 cosine_sim_udf 的参数 df 是一个 pandas 数据帧。请参阅 docs:

Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame

您需要像这样更改您的代码:

import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import pyspark.sql.functions as F

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(pdf):
    em = pdf["embeddings"].values.tolist()
    cosine_sim = pd.DataFrame({'cosine_similarities': [cosine_similarity(em).tolist()]})

    cosine_sim["shop_id"] = pdf["shop_id"]

    return cosine_sim

示例:

embeddings = spark.createDataFrame([
    (1, [1., 0., 3., 5.]), (1, [6., 7., 8., 5.]),
    (2, [1.3, 4.4, 2.1, 3.9]), (2, [9., 5., 0., 3.]),
    (3, [1.3, 4.4, 2.1, 3.9]), (3, [9., 5., 0., 3.])
], ["shop_id", "embeddings"])

embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(truncate=False)

#+-------+---------------------------------------------------------------------+
#|shop_id|cosine_similarities                                                  |
#+-------+---------------------------------------------------------------------+
#|1      |[[1.0, 0.704780765735282], [0.704780765735282, 0.9999999999999999]]  |
#|2      |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#|3      |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#+-------+---------------------------------------------------------------------+