Pyspark:检查每个单元格的条件并计算校验位

Pyspark: Check for conditions on each cell and calculate the checkdigit

我有一个列 RESULT,每个列的长度为 11,其模式是:

RESULT: string (nullable = true)

现在,我想执行以下操作并更新一个新列,最后添加一个额外的数字。下面显示的示例是第一个数字 03600024145

注意:我不想将 table 的格式更改为 pandas 但在 Pyspark 数据帧中执行所有操作。

  1. 奇数相加:0+6+0+2+1+5 = 14.
  2. 将结果乘以 3:14 × 3 = 42。
  3. 偶数相加:3+0+0+4+4 = 11.
  4. 将两个结果相加:42 + 11 = 53。
  5. 校验位的计算取(53 / 10)的余数,也就是(53模10),如果不是0则用10减去,所以校验位的值为7。即 (53 / 10) = 5 余数 3; 10 - 3 = 7.
  6. 最后加上这个校验位。所以数字变成了036000241457

因此,如果将此逻辑应用于整个列,结果将变为 UPDATED RESULT

进一步理清逻辑:https://en.wikipedia.org/wiki/Check_digit#UPC

有一个类似的 python 代码,但在第 5 步有点不同:

我们可以将逻辑转换为 Spark 函数。

  1. 首先提取各个位置的数字并将它们转换为整数。
  2. 然后分别求和奇数和偶数。
  3. 将奇数和乘以 3,再加上偶数和。
  4. 应用模运算。
  5. 将第4步的结果减去10,然后对10取模,模拟当第4步的结果为0时校验位为0的行为。
  6. 最后,将 RESULT 列与 check digit 连接起来。

工作示例

import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List

df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))

def sum_digits(c: Column, pos: List[int]):
    sum_col = F.lit(0) 
    for p in pos:
        sum_col = sum_col + F.substring(c, p, 1).cast("int")
    return sum_col

def check_digit(c: Column) -> Column:
    odd_sum = sum_digits(c, [1, 3, 5, 7, 9, 11])
    even_sum = sum_digits(c, [2, 4, 6, 8, 10])
    sum_result = (3 * odd_sum) + even_sum
    modulo  = sum_result % 10
    return (10 - modulo) % 10    

df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()

输出

+-----------+--------------+
|     RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145|  036000241457|
|01010101010|  010101010105|
+-----------+--------------+

使用用户定义函数 (udf) 的解决方案。

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col

df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])

@udf(StringType())
def add_check_digit(val):
    odd = sum(int(i) for i in val[::2])
    even = sum(int(i) for i in val[1::2])
    check_val = (odd * 3 + even) % 10
    
    return val + str((10 - check_val) % 10)

df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))

df.show()

+-----------+--------------+                                                    
|     RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145|  036000241457|
|01010101010|  010101010105|
+-----------+--------------+

您可以将列 RESULT 拆分为一个数字数组,而不是使用一些高阶函数 transformaggregate,您可以计算连接的 checkdigit到原始字符串:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "digits",
    F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
    "digits",
    F.expr("transform(digits, (x, i) -> struct(int(x) as d, i+1 as i))")
).withColumn(
    "odd_even",
    F.expr(
        """aggregate(digits, 
                     array(0, 0), 
                     (acc, x) -> 
                         IF (x.i%2 = 1,
                             array(acc[0] + x.d, acc[1]),
                             array(acc[0], acc[1] + x.d)
                         )
        )""")
).withColumn(
    "UPDATED RESULT",
    F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 + F.col("odd_even")[1]) % 10))
).select(
    "RESULT", "UPDATED RESULT"
)

df1.show(truncate=False)

#+-----------+--------------+
#|RESULT     |UPDATED RESULT|
#+-----------+--------------+
#|03600024145|036000241457  |
#|01010101010|010101010105  |
#+-----------+--------------+

解释:

  • 步骤1: split the column and slice the resulting array to remove the last empty value. Then transform数组的每个元素加上它的索引。 (示例 0 -> struct(0, 1)
  • 第2步:使用aggregate,利用我们在第一步中添加的索引
  • 对偶数位和奇数位求和
  • 第三步:计算校验位并与结果列拼接

您可以显示所有中间列以了解逻辑。