Pyspark - 如果 char 存在,则拆分并 return 连接后的第一个和最后一个元素,否则 return 存在
Pyspark - If char exists, then split and return 1st and last element after concatination, else return existing
我正在尝试创建一个函数,它在处理数据后接收列名和 returns 列。
我坚持使用的功能之一如下
如果 /
出现在输入中
- 在
/
上拆分给定的输入
- Select拆分后的第一个元素和最后一个元素
- 如果第一个元素的长度是
3
或10
则处理,否则使col值为空
- 如果最后一个元素的长度是
7
或 10
则处理,否则将 col 值设置为 null
如果输入中不存在 /
- 从输入中取出前 10 个字符
下面是我的函数。对 df 的任何直接处理也有帮助吗?
def phone_number_processor(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
...
# if input has '/', then
# ?????
# if input doesn't have '/' then
col = F.substring(col, 1, 10) # get first 10 chars
...
return col
示例输出:
df.withColumn('PROCESSED_PHONE', phone_number_processor('PHONE')).show()
+----------------+---------------+
| PHONE|PROCESSED_PHONE|
+----------------+---------------+
| 1234567890| 1234567890| #-> as is
|123/2345/1234567| 1231234567| #-> first and last elements after split with '/'
| 123/1234567| 1231234567| #-> same as above
| 123/12345| null| #-> since length last element after split is != 7
| 1234/1234567| null| #-> since length first element after split is != 3
+----------------+---------------+
PS。我尝试使用 spark 函数 - contains, split 但是我无法做我想做的事。我已经为此工作了很长一段时间,任何 inputs/suggestions 也很感激。
当您实际上仅使用 Spark 内置函数就可以执行相同的操作时,无需定义 UDF 函数。只需拆分列 PHONE
,然后在结果数组的第一个和最后一个元素上使用一些 when
表达式即可获得所需的输出,如下所示:
from pyspark.sql import functions as F
df = spark.createDataFrame([("1234567890",), ("123/2345/1234567",), ("123/1234567",), ("123/12345",), ("1234/1234567",)], ["PHONE"])
df1 = df.withColumn("split", F.split("PHONE", "/")) \
.withColumn("first_part", F.element_at("split", 1)) \
.withColumn("last_part", F.element_at("split", -1)) \
.withColumn(
"PROCESSED_PHONE",
F.when(
F.size("split") == 1,
F.substring("first_part", 0, 10)
).otherwise(
F.concat(
F.when(F.length("first_part") == 3, F.col("first_part")),
F.when(F.length("last_part") == 7, F.col("last_part"))
)
)
).drop("first_part", "last_part", "split")
df1.show()
#+----------------+---------------+
#| PHONE|PROCESSED_PHONE|
#+----------------+---------------+
#| 1234567890| 1234567890|
#|123/2345/1234567| 1231234567|
#| 123/1234567| 1231234567|
#| 123/12345| null|
#| 1234/1234567| null|
#+----------------+---------------+
基于,如果您感兴趣,请创建函数
def phone_number_validator(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
split_col = F.split(col, '/')
first_part= F.element_at(split_col, 1)
last_part = F.element_at(split_col, -1)
col = F.when(
F.size(split_col) == 1 & F.length(col) >= 10,
F.substring(col, 1, 10) # get first 10 chars
).otherwise(
F.concat(
F.when(F.length(first_part) == 3, first_part),
F.when(F.length(last_part ) == 7, last_part ),
) # concatenate any string with a NULL, will result in NULL
)
return col
用法:
df = df.withColumn('PROCESSED_PHONE', phone_number_validator('PHONE'))
我正在尝试创建一个函数,它在处理数据后接收列名和 returns 列。
我坚持使用的功能之一如下
如果 /
出现在输入中
- 在
/
上拆分给定的输入
- Select拆分后的第一个元素和最后一个元素
- 如果第一个元素的长度是
3
或10
则处理,否则使col值为空 - 如果最后一个元素的长度是
7
或10
则处理,否则将 col 值设置为 null
如果输入中不存在 /
- 从输入中取出前 10 个字符
下面是我的函数。对 df 的任何直接处理也有帮助吗?
def phone_number_processor(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
...
# if input has '/', then
# ?????
# if input doesn't have '/' then
col = F.substring(col, 1, 10) # get first 10 chars
...
return col
示例输出:
df.withColumn('PROCESSED_PHONE', phone_number_processor('PHONE')).show()
+----------------+---------------+
| PHONE|PROCESSED_PHONE|
+----------------+---------------+
| 1234567890| 1234567890| #-> as is
|123/2345/1234567| 1231234567| #-> first and last elements after split with '/'
| 123/1234567| 1231234567| #-> same as above
| 123/12345| null| #-> since length last element after split is != 7
| 1234/1234567| null| #-> since length first element after split is != 3
+----------------+---------------+
PS。我尝试使用 spark 函数 - contains, split 但是我无法做我想做的事。我已经为此工作了很长一段时间,任何 inputs/suggestions 也很感激。
当您实际上仅使用 Spark 内置函数就可以执行相同的操作时,无需定义 UDF 函数。只需拆分列 PHONE
,然后在结果数组的第一个和最后一个元素上使用一些 when
表达式即可获得所需的输出,如下所示:
from pyspark.sql import functions as F
df = spark.createDataFrame([("1234567890",), ("123/2345/1234567",), ("123/1234567",), ("123/12345",), ("1234/1234567",)], ["PHONE"])
df1 = df.withColumn("split", F.split("PHONE", "/")) \
.withColumn("first_part", F.element_at("split", 1)) \
.withColumn("last_part", F.element_at("split", -1)) \
.withColumn(
"PROCESSED_PHONE",
F.when(
F.size("split") == 1,
F.substring("first_part", 0, 10)
).otherwise(
F.concat(
F.when(F.length("first_part") == 3, F.col("first_part")),
F.when(F.length("last_part") == 7, F.col("last_part"))
)
)
).drop("first_part", "last_part", "split")
df1.show()
#+----------------+---------------+
#| PHONE|PROCESSED_PHONE|
#+----------------+---------------+
#| 1234567890| 1234567890|
#|123/2345/1234567| 1231234567|
#| 123/1234567| 1231234567|
#| 123/12345| null|
#| 1234/1234567| null|
#+----------------+---------------+
基于
def phone_number_validator(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
split_col = F.split(col, '/')
first_part= F.element_at(split_col, 1)
last_part = F.element_at(split_col, -1)
col = F.when(
F.size(split_col) == 1 & F.length(col) >= 10,
F.substring(col, 1, 10) # get first 10 chars
).otherwise(
F.concat(
F.when(F.length(first_part) == 3, first_part),
F.when(F.length(last_part ) == 7, last_part ),
) # concatenate any string with a NULL, will result in NULL
)
return col
用法:
df = df.withColumn('PROCESSED_PHONE', phone_number_validator('PHONE'))