Pyspark:检查每个单元格的条件并计算校验位
Pyspark: Check for conditions on each cell and calculate the checkdigit
我有一个列 RESULT
,每个列的长度为 11,其模式是:
RESULT: string (nullable = true)
现在,我想执行以下操作并更新一个新列,最后添加一个额外的数字。下面显示的示例是第一个数字 03600024145
注意:我不想将 table 的格式更改为 pandas 但在 Pyspark 数据帧中执行所有操作。
- 奇数相加:0+6+0+2+1+5 = 14.
- 将结果乘以 3:14 × 3 = 42。
- 偶数相加:3+0+0+4+4 = 11.
- 将两个结果相加:42 + 11 = 53。
- 校验位的计算取(53 / 10)的余数,也就是(53模10),如果不是0则用10减去,所以校验位的值为7。即 (53 / 10) = 5 余数 3; 10 - 3 = 7.
- 最后加上这个校验位。所以数字变成了
036000241457
因此,如果将此逻辑应用于整个列,结果将变为 UPDATED RESULT
进一步理清逻辑:https://en.wikipedia.org/wiki/Check_digit#UPC
有一个类似的 python 代码,但在第 5 步有点不同:
我们可以将逻辑转换为 Spark 函数。
- 首先提取各个位置的数字并将它们转换为整数。
- 然后分别求和奇数和偶数。
- 将奇数和乘以 3,再加上偶数和。
- 应用模运算。
- 将第4步的结果减去10,然后对10取模,模拟当第4步的结果为0时校验位为0的行为。
- 最后,将
RESULT
列与 check digit
连接起来。
工作示例
import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List
df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))
def sum_digits(c: Column, pos: List[int]):
sum_col = F.lit(0)
for p in pos:
sum_col = sum_col + F.substring(c, p, 1).cast("int")
return sum_col
def check_digit(c: Column) -> Column:
odd_sum = sum_digits(c, [1, 3, 5, 7, 9, 11])
even_sum = sum_digits(c, [2, 4, 6, 8, 10])
sum_result = (3 * odd_sum) + even_sum
modulo = sum_result % 10
return (10 - modulo) % 10
df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()
输出
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
使用用户定义函数 (udf) 的解决方案。
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])
@udf(StringType())
def add_check_digit(val):
odd = sum(int(i) for i in val[::2])
even = sum(int(i) for i in val[1::2])
check_val = (odd * 3 + even) % 10
return val + str((10 - check_val) % 10)
df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))
df.show()
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
您可以将列 RESULT
拆分为一个数字数组,而不是使用一些高阶函数 transform
和 aggregate
,您可以计算连接的 checkdigit
到原始字符串:
import pyspark.sql.functions as F
df1 = df.withColumn(
"digits",
F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
"digits",
F.expr("transform(digits, (x, i) -> struct(int(x) as d, i+1 as i))")
).withColumn(
"odd_even",
F.expr(
"""aggregate(digits,
array(0, 0),
(acc, x) ->
IF (x.i%2 = 1,
array(acc[0] + x.d, acc[1]),
array(acc[0], acc[1] + x.d)
)
)""")
).withColumn(
"UPDATED RESULT",
F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 + F.col("odd_even")[1]) % 10))
).select(
"RESULT", "UPDATED RESULT"
)
df1.show(truncate=False)
#+-----------+--------------+
#|RESULT |UPDATED RESULT|
#+-----------+--------------+
#|03600024145|036000241457 |
#|01010101010|010101010105 |
#+-----------+--------------+
解释:
- 步骤1: split the column and slice the resulting array to remove the last empty value. Then transform数组的每个元素加上它的索引。 (示例
0 -> struct(0, 1)
)
- 第2步:使用aggregate,利用我们在第一步中添加的索引
对偶数位和奇数位求和
- 第三步:计算校验位并与结果列拼接
您可以显示所有中间列以了解逻辑。
我有一个列 RESULT
,每个列的长度为 11,其模式是:
RESULT: string (nullable = true)
现在,我想执行以下操作并更新一个新列,最后添加一个额外的数字。下面显示的示例是第一个数字 03600024145
注意:我不想将 table 的格式更改为 pandas 但在 Pyspark 数据帧中执行所有操作。
- 奇数相加:0+6+0+2+1+5 = 14.
- 将结果乘以 3:14 × 3 = 42。
- 偶数相加:3+0+0+4+4 = 11.
- 将两个结果相加:42 + 11 = 53。
- 校验位的计算取(53 / 10)的余数,也就是(53模10),如果不是0则用10减去,所以校验位的值为7。即 (53 / 10) = 5 余数 3; 10 - 3 = 7.
- 最后加上这个校验位。所以数字变成了
036000241457
因此,如果将此逻辑应用于整个列,结果将变为 UPDATED RESULT
进一步理清逻辑:https://en.wikipedia.org/wiki/Check_digit#UPC
有一个类似的 python 代码,但在第 5 步有点不同:
我们可以将逻辑转换为 Spark 函数。
- 首先提取各个位置的数字并将它们转换为整数。
- 然后分别求和奇数和偶数。
- 将奇数和乘以 3,再加上偶数和。
- 应用模运算。
- 将第4步的结果减去10,然后对10取模,模拟当第4步的结果为0时校验位为0的行为。
- 最后,将
RESULT
列与check digit
连接起来。
工作示例
import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List
df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))
def sum_digits(c: Column, pos: List[int]):
sum_col = F.lit(0)
for p in pos:
sum_col = sum_col + F.substring(c, p, 1).cast("int")
return sum_col
def check_digit(c: Column) -> Column:
odd_sum = sum_digits(c, [1, 3, 5, 7, 9, 11])
even_sum = sum_digits(c, [2, 4, 6, 8, 10])
sum_result = (3 * odd_sum) + even_sum
modulo = sum_result % 10
return (10 - modulo) % 10
df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()
输出
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
使用用户定义函数 (udf) 的解决方案。
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])
@udf(StringType())
def add_check_digit(val):
odd = sum(int(i) for i in val[::2])
even = sum(int(i) for i in val[1::2])
check_val = (odd * 3 + even) % 10
return val + str((10 - check_val) % 10)
df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))
df.show()
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
您可以将列 RESULT
拆分为一个数字数组,而不是使用一些高阶函数 transform
和 aggregate
,您可以计算连接的 checkdigit
到原始字符串:
import pyspark.sql.functions as F
df1 = df.withColumn(
"digits",
F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
"digits",
F.expr("transform(digits, (x, i) -> struct(int(x) as d, i+1 as i))")
).withColumn(
"odd_even",
F.expr(
"""aggregate(digits,
array(0, 0),
(acc, x) ->
IF (x.i%2 = 1,
array(acc[0] + x.d, acc[1]),
array(acc[0], acc[1] + x.d)
)
)""")
).withColumn(
"UPDATED RESULT",
F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 + F.col("odd_even")[1]) % 10))
).select(
"RESULT", "UPDATED RESULT"
)
df1.show(truncate=False)
#+-----------+--------------+
#|RESULT |UPDATED RESULT|
#+-----------+--------------+
#|03600024145|036000241457 |
#|01010101010|010101010105 |
#+-----------+--------------+
解释:
- 步骤1: split the column and slice the resulting array to remove the last empty value. Then transform数组的每个元素加上它的索引。 (示例
0 -> struct(0, 1)
) - 第2步:使用aggregate,利用我们在第一步中添加的索引 对偶数位和奇数位求和
- 第三步:计算校验位并与结果列拼接
您可以显示所有中间列以了解逻辑。