Pyspark 替代循环数组的 UDF 函数
Pyspark alternative to UDF function which loops an array
我已搜索但找不到适合我的 Pyspark 问题的答案。我正在寻找一种更高效且不使用 UDF 的替代方法。
我在 UDF 中有一个简单的等式,它的输入来自 (a) 文字常量、(b) 列值和 (c) 来自列表(或字典)的值。必须多次创建输出并将其存储在数组中。是否可以在 UDF 之外执行此操作?
我已经编写了这个简单的示例,尽管我的实际问题稍微复杂一些,行数更多,方程更大,并且循环超过 40 次:
注意:V3 示例问题:
from pyspark.sql.functions import *
from pyspark.sql.types import *
test_data = [("A1",10.5), ("A2",40.5), ("A3",60.5)]
schema = StructType([ \
StructField("ID",StringType(),True), \
StructField("num1",DoubleType(),True)])
df = spark.createDataFrame(data=test_data,schema=schema)
const1 = 10
const2 = 20
num_lst1 = [2.1,4.2,6.3,8.4,10.5]
num_lst2 = [20,40,60,80,100]
num_lst3 = [100.1,200.2,300.3,400.4,500.5]
def udf_whatever(num_lst1,num_lst2,num_lst3):
def whatever(const1, const2, val1):
DH = [None for t in range(5)]
for i in range(5):
DH[i] = const1+val1+const2+(num_lst1[i]*num_lst2[i])+num_lst3[i]
return DH
return udf(whatever, ArrayType(DoubleType()))
df2 = df.withColumn("UDF_OUT",udf_whatever(num_lst1,num_lst2,num_lst3)(lit(const1),lit(const2),col("num1")))
df2.show(truncate=False)
+---+----+-------------------------------------+
|ID |num1|UDF_OUT |
+---+----+-------------------------------------+
|A1 |10.5|[182.6, 408.7, 718.8, 1112.9, 1591.0]|
|A2 |40.5|[212.6, 438.7, 748.8, 1142.9, 1621.0]|
|A3 |60.5|[232.6, 458.7, 768.8, 1162.9, 1641.0]|
+---+----+-------------------------------------+
在 Emma 的帮助下(在评论中)我已经完成了这项工作,但为每个列表创建新列似乎有点贵,尤其是在有数百万行的情况下。有没有更好的方法?
df3 = df.withColumn('MAP_LIST1', array(*map(lit, num_lst1)))\
.withColumn('MAP_LIST2', array(*map(lit, num_lst2)))\
.withColumn('MAP_LIST3', array(*map(lit, num_lst3)))\
.withColumn('EQUATION_OUT', expr(f"""transform(MAP_LIST1, (x, i) -> {const1} + num1 + {const2} + (x * MAP_LIST2[i]) + MAP_LIST3[i])"""))
df3.show()
非常感谢任何帮助!
瑞克
一种方法是使用 array_repeat and transform.
首先,使用 array_repeat
创建仅包含 num3
值的基本数组。
然后,使用transform
计算数组中每个num3
值的值。
对于 Spark 3.1+
repeat = 5
const = 10
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', transform(col('arr'), lambda x, i: lit(const) + col('num1') + col('num2') + i * x)))
对于 Spark 2.4+ < 3.1
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', expr('transform(arr, (x, i) -> 10 + num1 + num2 + i * x)')))
============================================= ===============
用新方程更新(const + col + 列表元素)
如果只有 1 个数组(num_lst
),您可以用数组初始化 UDF_OUT
并执行 transform
将其他变量添加到 UDF_OUT
.
df = (df.withColumn('UDF_OUT', array(*map(lit, num_lst)))
.withColumn('UDF_OUT', expr(f"""
transform(UDF_OUT, (x, i) -> {const} + num1 + x)
""")))
我已搜索但找不到适合我的 Pyspark 问题的答案。我正在寻找一种更高效且不使用 UDF 的替代方法。
我在 UDF 中有一个简单的等式,它的输入来自 (a) 文字常量、(b) 列值和 (c) 来自列表(或字典)的值。必须多次创建输出并将其存储在数组中。是否可以在 UDF 之外执行此操作?
我已经编写了这个简单的示例,尽管我的实际问题稍微复杂一些,行数更多,方程更大,并且循环超过 40 次:
注意:V3 示例问题:
from pyspark.sql.functions import *
from pyspark.sql.types import *
test_data = [("A1",10.5), ("A2",40.5), ("A3",60.5)]
schema = StructType([ \
StructField("ID",StringType(),True), \
StructField("num1",DoubleType(),True)])
df = spark.createDataFrame(data=test_data,schema=schema)
const1 = 10
const2 = 20
num_lst1 = [2.1,4.2,6.3,8.4,10.5]
num_lst2 = [20,40,60,80,100]
num_lst3 = [100.1,200.2,300.3,400.4,500.5]
def udf_whatever(num_lst1,num_lst2,num_lst3):
def whatever(const1, const2, val1):
DH = [None for t in range(5)]
for i in range(5):
DH[i] = const1+val1+const2+(num_lst1[i]*num_lst2[i])+num_lst3[i]
return DH
return udf(whatever, ArrayType(DoubleType()))
df2 = df.withColumn("UDF_OUT",udf_whatever(num_lst1,num_lst2,num_lst3)(lit(const1),lit(const2),col("num1")))
df2.show(truncate=False)
+---+----+-------------------------------------+
|ID |num1|UDF_OUT |
+---+----+-------------------------------------+
|A1 |10.5|[182.6, 408.7, 718.8, 1112.9, 1591.0]|
|A2 |40.5|[212.6, 438.7, 748.8, 1142.9, 1621.0]|
|A3 |60.5|[232.6, 458.7, 768.8, 1162.9, 1641.0]|
+---+----+-------------------------------------+
在 Emma 的帮助下(在评论中)我已经完成了这项工作,但为每个列表创建新列似乎有点贵,尤其是在有数百万行的情况下。有没有更好的方法?
df3 = df.withColumn('MAP_LIST1', array(*map(lit, num_lst1)))\
.withColumn('MAP_LIST2', array(*map(lit, num_lst2)))\
.withColumn('MAP_LIST3', array(*map(lit, num_lst3)))\
.withColumn('EQUATION_OUT', expr(f"""transform(MAP_LIST1, (x, i) -> {const1} + num1 + {const2} + (x * MAP_LIST2[i]) + MAP_LIST3[i])"""))
df3.show()
非常感谢任何帮助! 瑞克
一种方法是使用 array_repeat and transform.
首先,使用 array_repeat
创建仅包含 num3
值的基本数组。
然后,使用transform
计算数组中每个num3
值的值。
对于 Spark 3.1+
repeat = 5
const = 10
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', transform(col('arr'), lambda x, i: lit(const) + col('num1') + col('num2') + i * x)))
对于 Spark 2.4+ < 3.1
df = (df.withColumn('arr', array_repeat('num3', repeat))
.withColumn('arr', expr('transform(arr, (x, i) -> 10 + num1 + num2 + i * x)')))
============================================= ===============
用新方程更新(const + col + 列表元素)
如果只有 1 个数组(num_lst
),您可以用数组初始化 UDF_OUT
并执行 transform
将其他变量添加到 UDF_OUT
.
df = (df.withColumn('UDF_OUT', array(*map(lit, num_lst)))
.withColumn('UDF_OUT', expr(f"""
transform(UDF_OUT, (x, i) -> {const} + num1 + x)
""")))