Pyspark: PicklingError: Could not serialize object:
Pyspark: PicklingError: Could not serialize object:
我有以下两个数据框:df_whitelist和df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
在df_whitelist中,每个关键字对应一组术语,例如关键词LA对应“LA city”和“US LA in da”。
在df_text中,我有文字和在这篇文字中找到的一些关键字。
我想做的是,对于每段文本,例如“客户有 ada..”,对于它的每个关键字“client”和“ada”,检查该关键字的所有白名单术语,看看如何这个词多次出现在课文中。
我试过的如下:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
我得到了错误:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
我试了一段时间也想不通。任何人都可以帮助指出问题以及如何解决它。谢谢
您正在将 pyspark 数据帧 df_whitelist
传递给 UDF
,无法对 pyspark 数据帧进行 pickle。您还在 UDF
内的数据帧上进行计算,这是不可接受的(不可能)。请记住,您的函数将被调用的次数与数据框中的行数一样多,因此您应该保持计算简单。并且仅在无法使用 pyspark sql 函数完成时才执行此操作。
您需要做的是在 keyword
上加入两个数据帧。
让我们从您提供的两个示例数据框开始:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
df_text
中的第 Keywords
列需要一些处理,我们必须将字符串转换为一个数组,然后将其展开,以便每行只有一个项目:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
现在我们可以加入 keyword
上的两个数据框:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
您在 UDF
中调用的第一个条件可以翻译如下:如果 df_text
中的 keyword
不存在于 df_whitelist
中,则0. 这相当于说 df_whitelist
列的值将在 left join
中为 NULL 因为它们只出现在左侧数据框
第二个条件:你统计whitelist_terms
出现在Text
的次数:Text.count(whitelist_terms)
我们将编写 UDF
来执行此操作:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
最后我们可以聚合以返回到只有不同 Text
:
的数据框
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+
我有以下两个数据框:df_whitelist和df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
在df_whitelist中,每个关键字对应一组术语,例如关键词LA对应“LA city”和“US LA in da”。 在df_text中,我有文字和在这篇文字中找到的一些关键字。 我想做的是,对于每段文本,例如“客户有 ada..”,对于它的每个关键字“client”和“ada”,检查该关键字的所有白名单术语,看看如何这个词多次出现在课文中。 我试过的如下:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
我得到了错误:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
我试了一段时间也想不通。任何人都可以帮助指出问题以及如何解决它。谢谢
您正在将 pyspark 数据帧 df_whitelist
传递给 UDF
,无法对 pyspark 数据帧进行 pickle。您还在 UDF
内的数据帧上进行计算,这是不可接受的(不可能)。请记住,您的函数将被调用的次数与数据框中的行数一样多,因此您应该保持计算简单。并且仅在无法使用 pyspark sql 函数完成时才执行此操作。
您需要做的是在 keyword
上加入两个数据帧。
让我们从您提供的两个示例数据框开始:
df_whitelist = spark.createDataFrame(
[["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
[["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
["Text", "Keywords"])
df_text
中的第 Keywords
列需要一些处理,我们必须将字符串转换为一个数组,然后将其展开,以便每行只有一个项目:
import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))
+-----------------+-------+
| Text|keyword|
+-----------------+-------+
|the client as ada| client|
|the client as ada| ada|
|this client has l| client|
|this client has l| LA|
+-----------------+-------+
现在我们可以加入 keyword
上的两个数据框:
df = df_text.join(df_whitelist, "keyword", "leftouter")
+-------+-----------------+-----------------+
|keyword| Text| whitelist_terms|
+-------+-----------------+-----------------+
| LA|this client has l| LA city|
| LA|this client has l| US LA in da|
| ada|the client as ada| null|
| client|the client as ada|this client has i|
| client|the client as ada| our client|
| client|this client has l|this client has i|
| client|this client has l| our client|
+-------+-----------------+-----------------+
您在
UDF
中调用的第一个条件可以翻译如下:如果df_text
中的keyword
不存在于df_whitelist
中,则0. 这相当于说df_whitelist
列的值将在left join
中为 NULL 因为它们只出现在左侧数据框第二个条件:你统计
whitelist_terms
出现在Text
的次数:Text.count(whitelist_terms)
我们将编写 UDF
来执行此操作:
from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
"Text",
"keyword",
F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
+-----------------+-------+----------------+
| Text|keyword|whitelist_counts|
+-----------------+-------+----------------+
|this client has l| LA| 0|
|this client has l| LA| 0|
|the client as ada| ada| 0|
|the client as ada| client| 0|
|the client as ada| client| 0|
|this client has l| client| 0|
|this client has l| client| 0|
+-----------------+-------+----------------+
最后我们可以聚合以返回到只有不同 Text
:
res = df.groupBy("Text").agg(
F.collect_set("keyword").alias("Keywords"),
F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()
+-----------------+-------------+----------------+
| Text| Keywords|whitelist_counts|
+-----------------+-------------+----------------+
|this client has l| [client, LA]| 0|
|the client as ada|[ada, client]| 0|
+-----------------+-------------+----------------+