PySpark 根据数组列中的值替换字符串列中的多个单词
PySpark replace multiple words in string column based on values in array column
我有一个数据框,其中包含一个包含不同长度文本的字符串列,然后我有一个数组列,其中每个元素都是一个结构,在文本列中具有指定的单词、索引、开始位置和结束位置。我想替换数组中文本列中的单词。
看起来像这样:
- id:integer
- text:string
- text_entity:array
- element:struct
- word:string
- index:integer
- start:integer
- end:integer
text
示例可以是:
"I talked with Christian today at Cafe Heimdal last Wednesday"
text_entity
例子可以是:
[{"word": "Christian", "index":4, "start":14, "end":23}, {"word": "Heimdal", "index":8, "start":38, "end":45}]
然后我想更改 text
以将上述索引处的单词替换为:
"I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday"
我最初的做法是把数组炸开,然后做一个regex_replace
,但是接下来就是收集文本合并的问题了。而且似乎需要大量操作。而且我不想使用 UDF,因为性能非常重要。 regex_replace
也有可能匹配子字符串的问题,那是不行的。因此,理想情况下使用索引、开始或结束。
在 text_entity
数组上使用 aggregate
函数,拆分的 text
列作为初始值,如下所示:
from pyspark.sql import functions as F
jsonSting = """{"id":1,"text":"I talked with Christian today at Cafe Heimdal last Wednesday","text_entity":[{"word":"Christian","index":4,"start":14,"end":23},{"word":"Heimdal","index":8,"start":38,"end":45}]}"""
df = spark.read.json(spark.sparkContext.parallelize([jsonSting]))
df1 = df.withColumn(
"text",
F.array_join(
F.expr(r"""aggregate(
text_entity,
split(text, " "),
(acc, x) -> transform(acc, (y, i) -> IF(i=x.index, '(BLEEP)', y))
)"""),
" "
)
)
df1.show(truncate=False)
#+---+----------------------------------------------------------+----------------------------------------------+
#|id |text |text_entity |
#+---+----------------------------------------------------------+----------------------------------------------+
#|1 |I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday|[{23, 4, 14, Christian}, {45, 8, 38, Heimdal}]|
#+---+----------------------------------------------------------+----------------------------------------------+
我使用 regexp_replace
想出了这个答案。然而,使用 regex_replace
的问题在于它会替换所有出现的地方,这不是本意,因为一个词可能会在文本中多次出现,并且只有一些出现应该是 bleeped
df = df.withColumn("temp_entities", F.expr(f"transform(text_entity, (x, i) -> x.word)")) \
.withColumn("temp_entities", F.array_distinct("temp_entities")) \
.withColumn("regex_expression", F.concat_ws("|", "temp_entities")) \
.withColumn("regex_expression", F.concat(F.lit("\b("), F.col("regex_expression"), F.lit(")\b"))) \
.withColumn("text", F.when(F.size("text_entity") > 0, F.expr("regexp_replace(text, regex_expression, '(BLEEP)')")).otherwise(F.col(text)))
它会删除重复项,并且仅在至少有 1 个实体时应用 regexp_replace
。可能不是最优雅的解决方案,并且会 bleep
所有出现的单词。理想情况下应该使用位置。
我有一个数据框,其中包含一个包含不同长度文本的字符串列,然后我有一个数组列,其中每个元素都是一个结构,在文本列中具有指定的单词、索引、开始位置和结束位置。我想替换数组中文本列中的单词。
看起来像这样:
- id:integer
- text:string
- text_entity:array
- element:struct
- word:string
- index:integer
- start:integer
- end:integer
text
示例可以是:
"I talked with Christian today at Cafe Heimdal last Wednesday"
text_entity
例子可以是:
[{"word": "Christian", "index":4, "start":14, "end":23}, {"word": "Heimdal", "index":8, "start":38, "end":45}]
然后我想更改 text
以将上述索引处的单词替换为:
"I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday"
我最初的做法是把数组炸开,然后做一个regex_replace
,但是接下来就是收集文本合并的问题了。而且似乎需要大量操作。而且我不想使用 UDF,因为性能非常重要。 regex_replace
也有可能匹配子字符串的问题,那是不行的。因此,理想情况下使用索引、开始或结束。
在 text_entity
数组上使用 aggregate
函数,拆分的 text
列作为初始值,如下所示:
from pyspark.sql import functions as F
jsonSting = """{"id":1,"text":"I talked with Christian today at Cafe Heimdal last Wednesday","text_entity":[{"word":"Christian","index":4,"start":14,"end":23},{"word":"Heimdal","index":8,"start":38,"end":45}]}"""
df = spark.read.json(spark.sparkContext.parallelize([jsonSting]))
df1 = df.withColumn(
"text",
F.array_join(
F.expr(r"""aggregate(
text_entity,
split(text, " "),
(acc, x) -> transform(acc, (y, i) -> IF(i=x.index, '(BLEEP)', y))
)"""),
" "
)
)
df1.show(truncate=False)
#+---+----------------------------------------------------------+----------------------------------------------+
#|id |text |text_entity |
#+---+----------------------------------------------------------+----------------------------------------------+
#|1 |I talked with (BLEEP) today at Cafe (BLEEP) last Wednesday|[{23, 4, 14, Christian}, {45, 8, 38, Heimdal}]|
#+---+----------------------------------------------------------+----------------------------------------------+
我使用 regexp_replace
想出了这个答案。然而,使用 regex_replace
的问题在于它会替换所有出现的地方,这不是本意,因为一个词可能会在文本中多次出现,并且只有一些出现应该是 bleeped
df = df.withColumn("temp_entities", F.expr(f"transform(text_entity, (x, i) -> x.word)")) \
.withColumn("temp_entities", F.array_distinct("temp_entities")) \
.withColumn("regex_expression", F.concat_ws("|", "temp_entities")) \
.withColumn("regex_expression", F.concat(F.lit("\b("), F.col("regex_expression"), F.lit(")\b"))) \
.withColumn("text", F.when(F.size("text_entity") > 0, F.expr("regexp_replace(text, regex_expression, '(BLEEP)')")).otherwise(F.col(text)))
它会删除重复项,并且仅在至少有 1 个实体时应用 regexp_replace
。可能不是最优雅的解决方案,并且会 bleep
所有出现的单词。理想情况下应该使用位置。