Mask/replace Pyspark 中字符串列的内部
Mask/replace inner part of string column in Pyspark
我在数据框中有一个电子邮件列,我想用星号替换其中的一部分。我无法使用 PySpark 函数解决这个问题。
我的电子邮件栏可能是这样的
email_col
abc123@gmail.com
123abc123@yahoo.com
我想达到的效果是这样的:
mod_email_col
ab**23@gmail.com
12*****23@yahoo.com
所以基本上除了第一个2个字符和最后2个字符,我希望剩下的部分用星号代替。
这是我试过的
from pyspark.sql import functions as F
split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))
df.withColumn(
'masked_part',
F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)
我认为您可以借助以下正则表达式实现此目的:(?<=.{2})\w+(?=.{2}@)
(?<=.{2})
:对 2 个字符进行正面回顾
\w+
:任意单词字符
(?=.{2}@)
:前瞻 2 个字符后跟文字 @
首先使用 regexp_extract
从字符串中提取此模式。
from pyspark.sql.functions import regexp_extract, regexp_replace
df = df.withColumn(
"pattern",
regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
)
df.show()
#+-------------------+-------+
#| email|pattern|
#+-------------------+-------+
#| abc123@gmail.com| c1|
#|123abc123@yahoo.com| 3abc1|
#| abcd@test.com| |
#+-------------------+-------+
然后使用regexp_replace
创建相同长度的*
的替换。
df = df.withColumn(
"replacement",
regexp_replace("pattern", r"\w", "*")
)
df.show()
#+-------------------+-------+-----------+
#| email|pattern|replacement|
#+-------------------+-------+-----------+
#| abc123@gmail.com| c1| **|
#|123abc123@yahoo.com| 3abc1| *****|
#| abcd@test.com| | |
#+-------------------+-------+-----------+
接下来使用派生的 pattern
和 replacement
列在原始 email
列上再次使用 regexp_replace
。
为了安全起见,concat
the lookbehind/lookaheads from the original pattern when doing the replacment. To do this, we will have to use expr
in order to 。
from pyspark.sql.functions import concat, expr, lit
df = df.withColumn(
"mod_email_col",
expr("regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)")
)
df.show()
#+-------------------+-------+-----------+-------------------+
#| email|pattern|replacement| mod_email_col|
#+-------------------+-------+-----------+-------------------+
#| abc123@gmail.com| c1| **| ab**23@gmail.com|
#|123abc123@yahoo.com| 3abc1| *****|12*****23@yahoo.com|
#| abcd@test.com| | | abcd@test.com|
#+-------------------+-------+-----------+-------------------+
最后删除中间列:
df = df.drop("pattern", "replacement")
df.show()
#+-------------------+-------------------+
#| email| mod_email_col|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abcd@test.com| abcd@test.com|
#+-------------------+-------------------+
注意:我添加了一个测试用例来证明如果电子邮件地址部分是 4 个字符或更少,这不会执行任何操作。
更新:这里有一些方法可以处理电子邮件地址部分少于 4 个字符的边缘情况。
我使用的规则:
- 电子邮件地址长度超过 5:执行上述操作
- 电子邮件地址长度为 3、4 或 5:保留第一个和最后一个字符,用
*
屏蔽其他字符
- 电子邮件地址的长度为 1 或 2:
@
之前的单个字符的掩码
代码:
patA = "regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
patB = "regexp_replace(email, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when
df.withColumn("address_part", split("email", "@").getItem(0))\
.withColumn(
"pattern",
when(
length("address_part") > 5,
regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
).otherwise(
regexp_extract("email", r"(?<=.{1})\w+(?=.{1}@)", 0)
)
).withColumn(
"replacement", regexp_replace("pattern", r"\w", "*")
).withColumn(
"mod_email_col",
when(
length("address_part") > 5, expr(patA)
).when(
length("address_part") > 3, expr(patB)
).otherwise(regexp_replace('email', '\w(?=@)', '*'))
).drop("pattern", "replacement", "address_part").show()
#+-------------------+-------------------+
#| email| mod_email_col|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abcde@test.com| a***e@test.com|
#| abcd@test.com| a**d@test.com|
#| ab@test.com| a*@test.com|
#| a@test.com| *@test.com|
#+-------------------+-------------------+
可以使用一些字符串操作来简化您的问题(Spark SQL 函数:instr, concat, left, repeat, substr):
首先找到@
在邮件字符串中的位置:pos_at = instr(email_col, '@')
,然后用户名部分的长度为pos_at - 1
。如果我们把N=2
作为要保留的字符数,那么要屏蔽的字符数应该是pos_at - 1 - 2*N
,在代码中,我们有:
from pyspark.sql.functions import instr, expr
df = spark.createDataFrame(
[(e,) for e in ['abc123@gmail.com', '123abc123@yahoo.com', 'abd@gmail.com']]
, ['email_col']
)
# set N=2 as a parameter in the SQL expression
N = 2
df.withColumn('pos_at', instr('email_col', '@')) \
.withColumn('new_col', expr("""
CONCAT(LEFT(email_col,{0}), REPEAT('*', pos_at-1-2*{0}), SUBSTR(email_col, pos_at-{0}))
""".format(N))).show(truncate=False)
#+-------------------+------+-------------------+
#|email_col |pos_at|new_col |
#+-------------------+------+-------------------+
#|abc123@gmail.com |7 |ab**23@gmail.com |
#|123abc123@yahoo.com|10 |12*****23@yahoo.com|
#|abd@gmail.com |4 |abbd@gmail.com |
#+-------------------+------+-------------------+
请注意 pos_at - 1 <= 2*N
时最后一行的问题,必须单独处理。如果我定义以下逻辑:
if `pos_at - 1 <= 2*N`: keep the first char and mask the rest
otherwise: keep the original processing routine
整个处理过程可以包含在一个带有两个参数(column_name
和 N
)的 lambda 函数中
# in the SQL expression, {0} is column_name and {1} is N
mask_email = lambda col_name, N: expr("""
IF(INSTR({0}, '@') <= {1}*2+1
, CONCAT(LEFT({0},1), REPEAT('*', INSTR({0}, '@')-2), SUBSTR({0}, INSTR({0}, '@')))
, CONCAT(LEFT({0},{1}), REPEAT('*', INSTR({0}, '@')-1-2*{1}), SUBSTR({0}, INSTR({0}, '@')-{1}))
) as `{0}_masked`
""".format(col_name, N))
df.select('*', mask_email('email_col', 2)).show()
#+-------------------+-------------------+
#| email_col| email_col_masked|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abd@gmail.com| a**@gmail.com|
#+-------------------+-------------------+
我在数据框中有一个电子邮件列,我想用星号替换其中的一部分。我无法使用 PySpark 函数解决这个问题。
我的电子邮件栏可能是这样的
email_col
abc123@gmail.com
123abc123@yahoo.com
我想达到的效果是这样的:
mod_email_col
ab**23@gmail.com
12*****23@yahoo.com
所以基本上除了第一个2个字符和最后2个字符,我希望剩下的部分用星号代替。
这是我试过的
from pyspark.sql import functions as F
split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))
df.withColumn(
'masked_part',
F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)
我认为您可以借助以下正则表达式实现此目的:(?<=.{2})\w+(?=.{2}@)
(?<=.{2})
:对 2 个字符进行正面回顾\w+
:任意单词字符(?=.{2}@)
:前瞻 2 个字符后跟文字@
首先使用 regexp_extract
从字符串中提取此模式。
from pyspark.sql.functions import regexp_extract, regexp_replace
df = df.withColumn(
"pattern",
regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
)
df.show()
#+-------------------+-------+
#| email|pattern|
#+-------------------+-------+
#| abc123@gmail.com| c1|
#|123abc123@yahoo.com| 3abc1|
#| abcd@test.com| |
#+-------------------+-------+
然后使用regexp_replace
创建相同长度的*
的替换。
df = df.withColumn(
"replacement",
regexp_replace("pattern", r"\w", "*")
)
df.show()
#+-------------------+-------+-----------+
#| email|pattern|replacement|
#+-------------------+-------+-----------+
#| abc123@gmail.com| c1| **|
#|123abc123@yahoo.com| 3abc1| *****|
#| abcd@test.com| | |
#+-------------------+-------+-----------+
接下来使用派生的 pattern
和 replacement
列在原始 email
列上再次使用 regexp_replace
。
为了安全起见,concat
the lookbehind/lookaheads from the original pattern when doing the replacment. To do this, we will have to use expr
in order to
from pyspark.sql.functions import concat, expr, lit
df = df.withColumn(
"mod_email_col",
expr("regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)")
)
df.show()
#+-------------------+-------+-----------+-------------------+
#| email|pattern|replacement| mod_email_col|
#+-------------------+-------+-----------+-------------------+
#| abc123@gmail.com| c1| **| ab**23@gmail.com|
#|123abc123@yahoo.com| 3abc1| *****|12*****23@yahoo.com|
#| abcd@test.com| | | abcd@test.com|
#+-------------------+-------+-----------+-------------------+
最后删除中间列:
df = df.drop("pattern", "replacement")
df.show()
#+-------------------+-------------------+
#| email| mod_email_col|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abcd@test.com| abcd@test.com|
#+-------------------+-------------------+
注意:我添加了一个测试用例来证明如果电子邮件地址部分是 4 个字符或更少,这不会执行任何操作。
更新:这里有一些方法可以处理电子邮件地址部分少于 4 个字符的边缘情况。
我使用的规则:
- 电子邮件地址长度超过 5:执行上述操作
- 电子邮件地址长度为 3、4 或 5:保留第一个和最后一个字符,用
*
屏蔽其他字符
- 电子邮件地址的长度为 1 或 2:
@
之前的单个字符的掩码
代码:
patA = "regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
patB = "regexp_replace(email, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when
df.withColumn("address_part", split("email", "@").getItem(0))\
.withColumn(
"pattern",
when(
length("address_part") > 5,
regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
).otherwise(
regexp_extract("email", r"(?<=.{1})\w+(?=.{1}@)", 0)
)
).withColumn(
"replacement", regexp_replace("pattern", r"\w", "*")
).withColumn(
"mod_email_col",
when(
length("address_part") > 5, expr(patA)
).when(
length("address_part") > 3, expr(patB)
).otherwise(regexp_replace('email', '\w(?=@)', '*'))
).drop("pattern", "replacement", "address_part").show()
#+-------------------+-------------------+
#| email| mod_email_col|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abcde@test.com| a***e@test.com|
#| abcd@test.com| a**d@test.com|
#| ab@test.com| a*@test.com|
#| a@test.com| *@test.com|
#+-------------------+-------------------+
可以使用一些字符串操作来简化您的问题(Spark SQL 函数:instr, concat, left, repeat, substr):
首先找到@
在邮件字符串中的位置:pos_at = instr(email_col, '@')
,然后用户名部分的长度为pos_at - 1
。如果我们把N=2
作为要保留的字符数,那么要屏蔽的字符数应该是pos_at - 1 - 2*N
,在代码中,我们有:
from pyspark.sql.functions import instr, expr
df = spark.createDataFrame(
[(e,) for e in ['abc123@gmail.com', '123abc123@yahoo.com', 'abd@gmail.com']]
, ['email_col']
)
# set N=2 as a parameter in the SQL expression
N = 2
df.withColumn('pos_at', instr('email_col', '@')) \
.withColumn('new_col', expr("""
CONCAT(LEFT(email_col,{0}), REPEAT('*', pos_at-1-2*{0}), SUBSTR(email_col, pos_at-{0}))
""".format(N))).show(truncate=False)
#+-------------------+------+-------------------+
#|email_col |pos_at|new_col |
#+-------------------+------+-------------------+
#|abc123@gmail.com |7 |ab**23@gmail.com |
#|123abc123@yahoo.com|10 |12*****23@yahoo.com|
#|abd@gmail.com |4 |abbd@gmail.com |
#+-------------------+------+-------------------+
请注意 pos_at - 1 <= 2*N
时最后一行的问题,必须单独处理。如果我定义以下逻辑:
if `pos_at - 1 <= 2*N`: keep the first char and mask the rest
otherwise: keep the original processing routine
整个处理过程可以包含在一个带有两个参数(column_name
和 N
)的 lambda 函数中
# in the SQL expression, {0} is column_name and {1} is N
mask_email = lambda col_name, N: expr("""
IF(INSTR({0}, '@') <= {1}*2+1
, CONCAT(LEFT({0},1), REPEAT('*', INSTR({0}, '@')-2), SUBSTR({0}, INSTR({0}, '@')))
, CONCAT(LEFT({0},{1}), REPEAT('*', INSTR({0}, '@')-1-2*{1}), SUBSTR({0}, INSTR({0}, '@')-{1}))
) as `{0}_masked`
""".format(col_name, N))
df.select('*', mask_email('email_col', 2)).show()
#+-------------------+-------------------+
#| email_col| email_col_masked|
#+-------------------+-------------------+
#| abc123@gmail.com| ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#| abd@gmail.com| a**@gmail.com|
#+-------------------+-------------------+