Pyspark: PicklingError: Could not serialize object:

Pyspark: PicklingError: Could not serialize object:

我有以下两个数据框:df_whitelist和df_text

+-------+--------------------+
|keyword|    whitelist_terms |
+-------+--------------------+
|     LA|             LA city|
|     LA|        US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
|                Text|  Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+

在df_whitelist中,每个关键字对应一组术语,例如关键词LA对应“LA city”和“US LA in da”。 在df_text中,我有文字和在这篇文字中找到的一些关键字。 我想做的是,对于每段文本,例如“客户有 ada..”,对于它的每个关键字“client”和“ada”,检查该关键字的所有白名单术语,看看如何这个词多次出现在课文中。 我试过的如下:

import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
    keywords = listOfKeyword.split(";")
    found_whiteterms_count = 0
    for k in keywords:
        if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
            found_whiteterms_count = found_whiteterms_count + 0
        else:
            df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
            n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
            found_whiteterms_count = found_whiteterms_count + n    
    return found_whiteterms_count     
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))

我得到了错误:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

我试了一段时间也想不通。任何人都可以帮助指出问题以及如何解决它。谢谢

您正在将 pyspark 数据帧 df_whitelist 传递给 UDF,无法对 pyspark 数据帧进行 pickle。您还在 UDF 内的数据帧上进行计算,这是不可接受的(不可能)。请记住,您的函数将被调用的次数与数据框中的行数一样多,因此您应该保持计算简单。并且仅在无法使用 pyspark sql 函数完成时才执行此操作。

您需要做的是在 keyword 上加入两个数据帧。 让我们从您提供的两个示例数据框开始:

df_whitelist = spark.createDataFrame(
    [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]], 
    ["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
    [["the client as ada", "client;ada"], ["this client has l", "client;LA"]], 
    ["Text", "Keywords"])

df_text 中的第 Keywords 列需要一些处理,我们必须将字符串转换为一个数组,然后将其展开,以便每行只有一个项目:

import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

    +-----------------+-------+
    |             Text|keyword|
    +-----------------+-------+
    |the client as ada| client|
    |the client as ada|    ada|
    |this client has l| client|
    |this client has l|     LA|
    +-----------------+-------+

现在我们可以加入 keyword 上的两个数据框:

df = df_text.join(df_whitelist, "keyword", "leftouter")

    +-------+-----------------+-----------------+
    |keyword|             Text|  whitelist_terms|
    +-------+-----------------+-----------------+
    |     LA|this client has l|          LA city|
    |     LA|this client has l|      US LA in da|
    |    ada|the client as ada|             null|
    | client|the client as ada|this client has i|
    | client|the client as ada|       our client|
    | client|this client has l|this client has i|
    | client|this client has l|       our client|
    +-------+-----------------+-----------------+
  • 您在 UDF 中调用的第一个条件可以翻译如下:如果 df_text 中的 keyword 不存在于 df_whitelist 中,则0. 这相当于说 df_whitelist 列的值将在 left join 中为 NULL 因为它们只出现在左侧数据框

  • 第二个条件:你统计whitelist_terms出现在Text的次数:Text.count(whitelist_terms)

我们将编写 UDF 来执行此操作:

from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
    "Text", 
    "keyword", 
    F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

    +-----------------+-------+----------------+
    |             Text|keyword|whitelist_counts|
    +-----------------+-------+----------------+
    |this client has l|     LA|               0|
    |this client has l|     LA|               0|
    |the client as ada|    ada|               0|
    |the client as ada| client|               0|
    |the client as ada| client|               0|
    |this client has l| client|               0|
    |this client has l| client|               0|
    +-----------------+-------+----------------+

最后我们可以聚合以返回到只有不同 Text:

的数据框
res = df.groupBy("Text").agg(
    F.collect_set("keyword").alias("Keywords"),
    F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()

    +-----------------+-------------+----------------+
    |             Text|     Keywords|whitelist_counts|
    +-----------------+-------------+----------------+
    |this client has l| [client, LA]|               0|
    |the client as ada|[ada, client]|               0|
    +-----------------+-------------+----------------+