PySpark 根据数组列中的值替换字符串列中的多个单词

PySpark replace multiple words in string column based on values in array column

我有一个数据框,其中包含一个包含不同长度文本的字符串列,然后我有一个数组列,其中每个元素都是一个结构,在文本列中具有指定的单词、索引、开始位置和结束位置。我想替换数组中文本列中的单词。

看起来像这样:

- id:integer
- text:string
- text_entity:array
  - element:struct
    - word:string
    - index:integer
    - start:integer
    - end:integer

text 示例可以是:

"I talked with Christian today at Cafe Heimdal last Wednesday"

text_entity 例子可以是:

[{"word": "Christian", "index":4, "start":14, "end":23}, {"word": "Heimdal", "index":8, "start":38, "end":45}]

然后我想更改 text 以将上述索引处的单词替换为:

"I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday"

我最初的做法是把数组炸开,然后做一个regex_replace,但是接下来就是收集文本合并的问题了。而且似乎需要大量操作。而且我不想使用 UDF,因为性能非常重要。 regex_replace 也有可能匹配子字符串的问题,那是不行的。因此,理想情况下使用索引、开始或结束。

text_entity 数组上使用 aggregate 函数,拆分的 text 列作为初始值,如下所示:

from pyspark.sql import functions as F

jsonSting = """{"id":1,"text":"I talked with Christian today at Cafe Heimdal last Wednesday","text_entity":[{"word":"Christian","index":4,"start":14,"end":23},{"word":"Heimdal","index":8,"start":38,"end":45}]}"""
df = spark.read.json(spark.sparkContext.parallelize([jsonSting]))

df1 = df.withColumn(
    "text",
    F.array_join(
        F.expr(r"""aggregate(
                  text_entity, 
                  split(text, " "), 
                  (acc, x) -> transform(acc, (y, i) -> IF(i=x.index, '(BLEEP)', y))
           )"""),
        " "
    )
)

df1.show(truncate=False)
#+---+----------------------------------------------------------+----------------------------------------------+
#|id |text                                                      |text_entity                                   |
#+---+----------------------------------------------------------+----------------------------------------------+
#|1  |I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday|[{23, 4, 14, Christian}, {45, 8, 38, Heimdal}]|
#+---+----------------------------------------------------------+----------------------------------------------+

我使用 regexp_replace 想出了这个答案。然而,使用 regex_replace 的问题在于它会替换所有出现的地方,这不是本意,因为一个词可能会在文本中多次出现,并且只有一些出现应该是 bleeped

df = df.withColumn("temp_entities", F.expr(f"transform(text_entity, (x, i) -> x.word)")) \
    .withColumn("temp_entities", F.array_distinct("temp_entities")) \
    .withColumn("regex_expression", F.concat_ws("|", "temp_entities")) \
    .withColumn("regex_expression", F.concat(F.lit("\b("), F.col("regex_expression"), F.lit(")\b"))) \
    .withColumn("text", F.when(F.size("text_entity") > 0, F.expr("regexp_replace(text, regex_expression, '(BLEEP)')")).otherwise(F.col(text)))

它会删除重复项,并且仅在至少有 1 个实体时应用 regexp_replace。可能不是最优雅的解决方案,并且会 bleep 所有出现的单词。理想情况下应该使用位置。