Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding working fine outside of UDF)
Pandas UDF: AttributeError: 'NoneType' object has no attribute '_jvm' (Coding working fine outside of UDF)
我知道有类似的话题,但我无法用这些解决方案解决我的错误。
这是我的架构:
root
|-- embeddings: array (nullable = true)
| |-- element: float (containsNull = true)
|-- id: long (nullable = true)
我正在尝试按 id 分组并在 id
中的所有 embeddings
中执行成对余弦相似度
这是我的代码首尾相连:
import pyspark.sql.types as T
import pyspark.sql.functions as F
embeddings=spark.read.parquet('directory')
schema = T.StructType([T.StructField("shop_id", T.LongType(), True),
T.StructField("cosine_similarities", T.ArrayType(T.ArrayType(T.DoubleType(), True), True))
])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(df):
single_col = df.select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
return cosine_sim
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(1)
这会引发以下错误:
AttributeError: 'NoneType' object has no attribute '_jvm'
现在,为了调试它,我 运行 函数中的代码只 id
而没有 运行 出现问题。
single_col = embeddings.filter("id =1").select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
如有任何帮助,我们将不胜感激。
您收到该错误是因为您试图在 pandas_udf
中使用 Pyspark 数据框。但是,传递给 cosine_sim_udf
的参数 df
是一个 pandas 数据帧。请参阅 docs:
Grouped map operations with Pandas instances are supported by
DataFrame.groupby().applyInPandas()
which requires a Python function
that takes a pandas.DataFrame
and return another pandas.DataFrame
您需要像这样更改您的代码:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import pyspark.sql.functions as F
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(pdf):
em = pdf["embeddings"].values.tolist()
cosine_sim = pd.DataFrame({'cosine_similarities': [cosine_similarity(em).tolist()]})
cosine_sim["shop_id"] = pdf["shop_id"]
return cosine_sim
示例:
embeddings = spark.createDataFrame([
(1, [1., 0., 3., 5.]), (1, [6., 7., 8., 5.]),
(2, [1.3, 4.4, 2.1, 3.9]), (2, [9., 5., 0., 3.]),
(3, [1.3, 4.4, 2.1, 3.9]), (3, [9., 5., 0., 3.])
], ["shop_id", "embeddings"])
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(truncate=False)
#+-------+---------------------------------------------------------------------+
#|shop_id|cosine_similarities |
#+-------+---------------------------------------------------------------------+
#|1 |[[1.0, 0.704780765735282], [0.704780765735282, 0.9999999999999999]] |
#|2 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#|3 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#+-------+---------------------------------------------------------------------+
我知道有类似的话题,但我无法用这些解决方案解决我的错误。
这是我的架构:
root
|-- embeddings: array (nullable = true)
| |-- element: float (containsNull = true)
|-- id: long (nullable = true)
我正在尝试按 id 分组并在 id
embeddings
中执行成对余弦相似度
这是我的代码首尾相连:
import pyspark.sql.types as T
import pyspark.sql.functions as F
embeddings=spark.read.parquet('directory')
schema = T.StructType([T.StructField("shop_id", T.LongType(), True),
T.StructField("cosine_similarities", T.ArrayType(T.ArrayType(T.DoubleType(), True), True))
])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(df):
single_col = df.select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
return cosine_sim
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(1)
这会引发以下错误:
AttributeError: 'NoneType' object has no attribute '_jvm'
现在,为了调试它,我 运行 函数中的代码只 id
而没有 运行 出现问题。
single_col = embeddings.filter("id =1").select(F.col('embeddings'))
single_col_flatmap = single_col.rdd.flatMap(lambda x: x).collect()
cosine_sim = cosine_similarity(single_col_flatmap)
如有任何帮助,我们将不胜感激。
您收到该错误是因为您试图在 pandas_udf
中使用 Pyspark 数据框。但是,传递给 cosine_sim_udf
的参数 df
是一个 pandas 数据帧。请参阅 docs:
Grouped map operations with Pandas instances are supported by
DataFrame.groupby().applyInPandas()
which requires a Python function that takes apandas.DataFrame
and return anotherpandas.DataFrame
您需要像这样更改您的代码:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import pyspark.sql.functions as F
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def cosine_sim_udf(pdf):
em = pdf["embeddings"].values.tolist()
cosine_sim = pd.DataFrame({'cosine_similarities': [cosine_similarity(em).tolist()]})
cosine_sim["shop_id"] = pdf["shop_id"]
return cosine_sim
示例:
embeddings = spark.createDataFrame([
(1, [1., 0., 3., 5.]), (1, [6., 7., 8., 5.]),
(2, [1.3, 4.4, 2.1, 3.9]), (2, [9., 5., 0., 3.]),
(3, [1.3, 4.4, 2.1, 3.9]), (3, [9., 5., 0., 3.])
], ["shop_id", "embeddings"])
embeddings.groupBy("shop_id").apply(cosine_sim_udf).show(truncate=False)
#+-------+---------------------------------------------------------------------+
#|shop_id|cosine_similarities |
#+-------+---------------------------------------------------------------------+
#|1 |[[1.0, 0.704780765735282], [0.704780765735282, 0.9999999999999999]] |
#|2 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#|3 |[[1.0, 0.6638498270978581], [0.6638498270978581, 1.0000000000000002]]|
#+-------+---------------------------------------------------------------------+