pyspark 替换列值

pypark replace column values

我们有以下示例数据框

+-----------+---------------+--------------+
|customer_id|age             |post_code    |
+-----------+---------------+--------------+
|       1001|              50|   BS32 0HW  |
+-----------+---------------+--------------+

然后我们得到这样的字符串

useful_info = 'Customer [customer_id] is [age] years old and lives at [post_code].'

这是示例字符串之一,它可以是其中包含列名的任何字符串。我只需要用实际值替换那些列名。

现在我需要添加 useful_info 列但替换为列值,即 预期数据框为:

[Row(customer_id='1001', age=50, post_code='BS32 0HW', useful_info='Customer 1001 is 50 years old and lives at BS32 0HW.')]

有人知道怎么做吗?

您可以采用以下方法。它将动态评估列的值。

注:

(1) 我写了一篇 UDF,其中我正在使用 regex。如果您在列名中有更多特殊字符,例如 underscore (_),则也将其包含在正则表达式中。

(2) 所有逻辑均基于 Info 包含列名称为 [column name] 的模式。请更新正则表达式以防出现任何其他模式。

>>> from pyspark.sql.functions import *
>>> import re
>>> df.show(10,False)
+-----------+---+---------+----------------------------------------------------------------------+
|customer_id|age|post_code|Info                                                                  |
+-----------+---+---------+----------------------------------------------------------------------+
|1001       |50 |BS32 0HW | Customer [customer_id] is [age] years old and lives at [post_code].  |
|1002       |39 |AQ74 0TH | Age of Customer '[customer_id]' is [age] and he lives at [post_code].|
|1003       |25 |RT23 0YJ | Customer [customer_id] lives at [post_code]. He is [age] years old.  |
+-----------+---+---------+----------------------------------------------------------------------+

>>> def evaluateExpr(Info,data):
...     matchpattern = re.findall(r"\[([A-Za-z0-9_ ]+)\]", Info)
...     out = Info
...     for x in matchpattern:
...                     out = out.replace("[" + x + "]", data[x])
...     return out
... 
>>> evalExprUDF = udf(evaluateExpr)
>>> df.withColumn("Info", evalExprUDF(col("Info"),struct([df[x] for x in df.columns]))).show(10,False)
+-----------+---+---------+-------------------------------------------------------+
|customer_id|age|post_code|Info                                                   |
+-----------+---+---------+-------------------------------------------------------+
|1001       |50 |BS32 0HW | Customer 1001 is 50 years old and lives at BS32 0HW.  |
|1002       |39 |AQ74 0TH | Age of Customer '1002' is 39 and he lives at AQ74 0TH.|
|1003       |25 |RT23 0YJ | Customer 1003 lives at RT23 0YJ. He is 25 years old.  |
+-----------+---+---------+-------------------------------------------------------+

这是使用 regexp_replace 函数的一种方法。您可以在 useful_info 字符串列中包含要替换的列 并像这样构建一个表达式列:

df = spark.createDataFrame([(1001, 50, "BS32 0HW")], ["customer_id", "age", "post_code"])

list_columns_replace = ["customer_id", "age", "post_code"]

# replace first column in the string
to_replace = f"\\[{list_columns_replace[0]}\\]"
replace_expr = f"regexp_replace(useful_info, '{to_replace}', {list_columns_replace[0]})"

# loop through other columns to replace and update replacement expression
for c in list_columns_replace[1:]:
    to_replace = f"\\[{c}\\]"
    replace_expr = f"regexp_replace({replace_expr}, '{to_replace}', {c})"

# add new column 
df.withColumn("useful_info", lit("Customer [customer_id] is [age] years old and lives at [post_code].")) \
  .withColumn("useful_info", expr(replace_expr)) \
  .show(1, False)

#+-----------+---+---------+----------------------------------------------------+
#|customer_id|age|post_code|useful_info                                         |
#+-----------+---+---------+----------------------------------------------------+
#|1001       |50 |BS32 0HW |Customer 1001 is 50 years old and lives at BS32 0HW.|
#+-----------+---+---------+----------------------------------------------------+