pyspark 替换列值
pypark replace column values
我们有以下示例数据框
+-----------+---------------+--------------+
|customer_id|age |post_code |
+-----------+---------------+--------------+
| 1001| 50| BS32 0HW |
+-----------+---------------+--------------+
然后我们得到这样的字符串
useful_info = 'Customer [customer_id] is [age] years old and lives at [post_code].'
这是示例字符串之一,它可以是其中包含列名的任何字符串。我只需要用实际值替换那些列名。
现在我需要添加 useful_info
列但替换为列值,即
预期数据框为:
[Row(customer_id='1001', age=50, post_code='BS32 0HW', useful_info='Customer 1001 is 50 years old and lives at BS32 0HW.')]
有人知道怎么做吗?
您可以采用以下方法。它将动态评估列的值。
注:
(1) 我写了一篇 UDF
,其中我正在使用 regex
。如果您在列名中有更多特殊字符,例如 underscore (_)
,则也将其包含在正则表达式中。
(2) 所有逻辑均基于 Info
包含列名称为 [column name]
的模式。请更新正则表达式以防出现任何其他模式。
>>> from pyspark.sql.functions import *
>>> import re
>>> df.show(10,False)
+-----------+---+---------+----------------------------------------------------------------------+
|customer_id|age|post_code|Info |
+-----------+---+---------+----------------------------------------------------------------------+
|1001 |50 |BS32 0HW | Customer [customer_id] is [age] years old and lives at [post_code]. |
|1002 |39 |AQ74 0TH | Age of Customer '[customer_id]' is [age] and he lives at [post_code].|
|1003 |25 |RT23 0YJ | Customer [customer_id] lives at [post_code]. He is [age] years old. |
+-----------+---+---------+----------------------------------------------------------------------+
>>> def evaluateExpr(Info,data):
... matchpattern = re.findall(r"\[([A-Za-z0-9_ ]+)\]", Info)
... out = Info
... for x in matchpattern:
... out = out.replace("[" + x + "]", data[x])
... return out
...
>>> evalExprUDF = udf(evaluateExpr)
>>> df.withColumn("Info", evalExprUDF(col("Info"),struct([df[x] for x in df.columns]))).show(10,False)
+-----------+---+---------+-------------------------------------------------------+
|customer_id|age|post_code|Info |
+-----------+---+---------+-------------------------------------------------------+
|1001 |50 |BS32 0HW | Customer 1001 is 50 years old and lives at BS32 0HW. |
|1002 |39 |AQ74 0TH | Age of Customer '1002' is 39 and he lives at AQ74 0TH.|
|1003 |25 |RT23 0YJ | Customer 1003 lives at RT23 0YJ. He is 25 years old. |
+-----------+---+---------+-------------------------------------------------------+
这是使用 regexp_replace
函数的一种方法。您可以在 useful_info
字符串列中包含要替换的列
并像这样构建一个表达式列:
df = spark.createDataFrame([(1001, 50, "BS32 0HW")], ["customer_id", "age", "post_code"])
list_columns_replace = ["customer_id", "age", "post_code"]
# replace first column in the string
to_replace = f"\\[{list_columns_replace[0]}\\]"
replace_expr = f"regexp_replace(useful_info, '{to_replace}', {list_columns_replace[0]})"
# loop through other columns to replace and update replacement expression
for c in list_columns_replace[1:]:
to_replace = f"\\[{c}\\]"
replace_expr = f"regexp_replace({replace_expr}, '{to_replace}', {c})"
# add new column
df.withColumn("useful_info", lit("Customer [customer_id] is [age] years old and lives at [post_code].")) \
.withColumn("useful_info", expr(replace_expr)) \
.show(1, False)
#+-----------+---+---------+----------------------------------------------------+
#|customer_id|age|post_code|useful_info |
#+-----------+---+---------+----------------------------------------------------+
#|1001 |50 |BS32 0HW |Customer 1001 is 50 years old and lives at BS32 0HW.|
#+-----------+---+---------+----------------------------------------------------+
我们有以下示例数据框
+-----------+---------------+--------------+
|customer_id|age |post_code |
+-----------+---------------+--------------+
| 1001| 50| BS32 0HW |
+-----------+---------------+--------------+
然后我们得到这样的字符串
useful_info = 'Customer [customer_id] is [age] years old and lives at [post_code].'
这是示例字符串之一,它可以是其中包含列名的任何字符串。我只需要用实际值替换那些列名。
现在我需要添加 useful_info
列但替换为列值,即
预期数据框为:
[Row(customer_id='1001', age=50, post_code='BS32 0HW', useful_info='Customer 1001 is 50 years old and lives at BS32 0HW.')]
有人知道怎么做吗?
您可以采用以下方法。它将动态评估列的值。
注:
(1) 我写了一篇 UDF
,其中我正在使用 regex
。如果您在列名中有更多特殊字符,例如 underscore (_)
,则也将其包含在正则表达式中。
(2) 所有逻辑均基于 Info
包含列名称为 [column name]
的模式。请更新正则表达式以防出现任何其他模式。
>>> from pyspark.sql.functions import *
>>> import re
>>> df.show(10,False)
+-----------+---+---------+----------------------------------------------------------------------+
|customer_id|age|post_code|Info |
+-----------+---+---------+----------------------------------------------------------------------+
|1001 |50 |BS32 0HW | Customer [customer_id] is [age] years old and lives at [post_code]. |
|1002 |39 |AQ74 0TH | Age of Customer '[customer_id]' is [age] and he lives at [post_code].|
|1003 |25 |RT23 0YJ | Customer [customer_id] lives at [post_code]. He is [age] years old. |
+-----------+---+---------+----------------------------------------------------------------------+
>>> def evaluateExpr(Info,data):
... matchpattern = re.findall(r"\[([A-Za-z0-9_ ]+)\]", Info)
... out = Info
... for x in matchpattern:
... out = out.replace("[" + x + "]", data[x])
... return out
...
>>> evalExprUDF = udf(evaluateExpr)
>>> df.withColumn("Info", evalExprUDF(col("Info"),struct([df[x] for x in df.columns]))).show(10,False)
+-----------+---+---------+-------------------------------------------------------+
|customer_id|age|post_code|Info |
+-----------+---+---------+-------------------------------------------------------+
|1001 |50 |BS32 0HW | Customer 1001 is 50 years old and lives at BS32 0HW. |
|1002 |39 |AQ74 0TH | Age of Customer '1002' is 39 and he lives at AQ74 0TH.|
|1003 |25 |RT23 0YJ | Customer 1003 lives at RT23 0YJ. He is 25 years old. |
+-----------+---+---------+-------------------------------------------------------+
这是使用 regexp_replace
函数的一种方法。您可以在 useful_info
字符串列中包含要替换的列
并像这样构建一个表达式列:
df = spark.createDataFrame([(1001, 50, "BS32 0HW")], ["customer_id", "age", "post_code"])
list_columns_replace = ["customer_id", "age", "post_code"]
# replace first column in the string
to_replace = f"\\[{list_columns_replace[0]}\\]"
replace_expr = f"regexp_replace(useful_info, '{to_replace}', {list_columns_replace[0]})"
# loop through other columns to replace and update replacement expression
for c in list_columns_replace[1:]:
to_replace = f"\\[{c}\\]"
replace_expr = f"regexp_replace({replace_expr}, '{to_replace}', {c})"
# add new column
df.withColumn("useful_info", lit("Customer [customer_id] is [age] years old and lives at [post_code].")) \
.withColumn("useful_info", expr(replace_expr)) \
.show(1, False)
#+-----------+---+---------+----------------------------------------------------+
#|customer_id|age|post_code|useful_info |
#+-----------+---+---------+----------------------------------------------------+
#|1001 |50 |BS32 0HW |Customer 1001 is 50 years old and lives at BS32 0HW.|
#+-----------+---+---------+----------------------------------------------------+