Pyspark - 如果 char 存在,则拆分并 return 连接后的第一个和最后一个元素,否则 return 存在

Pyspark - If char exists, then split and return 1st and last element after concatination, else return existing

我正在尝试创建一个函数,它在处理数据后接收列名和 returns 列。

我坚持使用的功能之一如下

如果 / 出现在输入中

如果输入中不存在 /

下面是我的函数。对 df 的任何直接处理也有帮助吗?

def phone_number_processor(col):
    if isinstance(col, str):
        col = F.col(col)
    remove_unnecessary_chars = "[^0-9/]"
    col = F.regexp_replace(col, remove_unnecessary_chars, '')
    col = F.when(F.length(col) <= 10, '').otherwise(col)  # ignore if length less than 10    
    ...

    # if input has '/', then
    # ?????

    # if input doesn't have '/' then
    col = F.substring(col, 1, 10)  # get first 10 chars

    ...
    return col

示例输出:

df.withColumn('PROCESSED_PHONE', phone_number_processor('PHONE')).show()
+----------------+---------------+
|           PHONE|PROCESSED_PHONE|
+----------------+---------------+
|      1234567890|     1234567890| #-> as is
|123/2345/1234567|     1231234567| #-> first and last elements after split with '/'
|     123/1234567|     1231234567| #-> same as above
|       123/12345|           null| #-> since length last element after split is != 7
|    1234/1234567|           null| #-> since length first element after split is != 3
+----------------+---------------+

PS。我尝试使用 spark 函数 - contains, split 但是我无法做我想做的事。我已经为此工作了很长一段时间,任何 inputs/suggestions 也很感激。

当您实际上仅使用 Spark 内置函数就可以执行相同的操作时,无需定义 UDF 函数。只需拆分列 PHONE,然后在结果数组的第一个和最后一个元素上使用一些 when 表达式即可获得所需的输出,如下所示:

from pyspark.sql import functions as F

df = spark.createDataFrame([("1234567890",), ("123/2345/1234567",), ("123/1234567",), ("123/12345",), ("1234/1234567",)], ["PHONE"])

df1 = df.withColumn("split", F.split("PHONE", "/")) \
    .withColumn("first_part", F.element_at("split", 1)) \
    .withColumn("last_part", F.element_at("split", -1)) \
    .withColumn(
        "PROCESSED_PHONE",
        F.when(
            F.size("split") == 1,
            F.substring("first_part", 0, 10)
        ).otherwise(
            F.concat(
                F.when(F.length("first_part") == 3, F.col("first_part")),
                F.when(F.length("last_part") == 7, F.col("last_part"))
            )
        )
).drop("first_part", "last_part", "split")

df1.show()

#+----------------+---------------+
#|           PHONE|PROCESSED_PHONE|
#+----------------+---------------+
#|      1234567890|     1234567890|
#|123/2345/1234567|     1231234567|
#|     123/1234567|     1231234567|
#|       123/12345|           null|
#|    1234/1234567|           null|
#+----------------+---------------+

基于,如果您感兴趣,请创建函数

def phone_number_validator(col):
    if isinstance(col, str):
        col = F.col(col)
    remove_unnecessary_chars = "[^0-9/]"
    col = F.regexp_replace(col, remove_unnecessary_chars, '')
    col = F.when(F.length(col) <= 10, '').otherwise(col)  # ignore if length less than 10    

    split_col = F.split(col, '/')
    first_part= F.element_at(split_col, 1)
    last_part = F.element_at(split_col, -1)
    col = F.when(
                F.size(split_col) == 1 & F.length(col) >= 10,
                F.substring(col, 1, 10) # get first 10 chars
            ).otherwise(
                F.concat(
                    F.when(F.length(first_part) == 3, first_part),
                    F.when(F.length(last_part ) == 7, last_part ),
                    )  # concatenate any string with a NULL, will result in NULL
                )
    return col

用法:

df = df.withColumn('PROCESSED_PHONE', phone_number_validator('PHONE'))