如何将数组拆分为块并找到块的总和并将输出作为数组存储在 pyspark 中
How to split an array into chunks and find the sum of the chunks and store the output as an array in pyspark
我有一个如下所示的数据框:
+-----+------------------------+
|Index| finalArray |
+-----+------------------------+
|1 |[0, 2, 0, 3, 1, 4, 2, 7]|
|2 |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+
我想将数组分成 2 个块,然后求出每个块的总和并将结果数组存储在列 finalArray 中。它将如下所示:
+-----+---------------------+
|Index| finalArray |
+-----+---------------------+
|1 |[2, 3, 5, 9] |
|2 |[4, 7, 6, 7] |
+-----+---------------------+
我可以通过创建一个 UDF 来做到这一点,但我正在寻找一种更好的优化方法。最好是,如果我可以使用 withColumn 并传递 flagArray 来处理它而无需编写 UDF。
@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
n = int(chunkSize)
aggsum = []
final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n - 1) // n )]
for item in final:
agg = 0
for j in item:
agg += j
aggsum.append(agg)
return aggsum
我无法在 UDF 中使用以下表达式,因此我使用了循环
[sum(finalArray[x:x+2]) for x in range(0, len(finalArray), chunkSize)]
对于spark 2.4+,你可以试试sequence + transform:
from pyspark.sql.function import expr
df = spark.createDataFrame([
(1, [0, 2, 0, 3, 1, 4, 2, 7]),
(2, [0, 4, 4, 3, 4, 2, 2, 5])
], ["Index", "finalArray"])
df.withColumn("finalArray", expr("""
transform(
sequence(0,ceil(size(finalArray)/2)-1),
i -> finalArray[2*i] + ifnull(finalArray[2*i+1],0))
""")).show(truncate=False)
+-----+------------+
|Index|finalArray |
+-----+------------+
|1 |[2, 3, 5, 9]|
|2 |[4, 7, 6, 7]|
+-----+------------+
对于任意 N 的块大小,使用 aggregate 函数进行小计:
N = 3
sql_expr = """
transform(
/* create a sequence from 0 to number_of_chunks-1 */
sequence(0,ceil(size(finalArray)/{0})-1),
/* iterate the above sequence */
i ->
/* create a sequence from 0 to chunk_size-1
calculate the sum of values containing every chunk_size items by their indices
*/
aggregate(
sequence(0,{0}-1),
0L,
(acc, y) -> acc + ifnull(finalArray[i*{0}+y],0)
)
)
"""
df.withColumn("finalArray", expr(sql_expr.format(N))).show()
+-----+----------+
|Index|finalArray|
+-----+----------+
| 1| [2, 8, 9]|
| 2| [8, 9, 7]|
+-----+----------+
这是@jxc 解决方案的一个略有不同的版本,它使用 slice
函数以及 transform
和 aggregate
函数。
逻辑是对于数组的每个元素,我们检查其索引是否是 chunk size
的倍数,并使用 slice
获取 chunk size
的子数组。使用 aggregate
我们对每个子数组的元素求和。最后使用filter
去空(对应不满足i % chunk = 0
的索引。
chunk = 2
transform_expr = f"""
filter(transform(finalArray,
(x, i) -> IF (i % {chunk} = 0,
aggregate(slice(finalArray, i+1, {chunk}), 0L, (acc, y) -> acc + y),
null
)
),
x -> x is not null)
"""
df.withColumn("finalArray", expr(transform_expr)).show()
#+-----+------------+
#|Index| finalArray|
#+-----+------------+
#| 1|[2, 3, 5, 9]|
#| 2|[4, 7, 6, 7]|
#+-----+------------+
我有一个如下所示的数据框:
+-----+------------------------+
|Index| finalArray |
+-----+------------------------+
|1 |[0, 2, 0, 3, 1, 4, 2, 7]|
|2 |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+
我想将数组分成 2 个块,然后求出每个块的总和并将结果数组存储在列 finalArray 中。它将如下所示:
+-----+---------------------+
|Index| finalArray |
+-----+---------------------+
|1 |[2, 3, 5, 9] |
|2 |[4, 7, 6, 7] |
+-----+---------------------+
我可以通过创建一个 UDF 来做到这一点,但我正在寻找一种更好的优化方法。最好是,如果我可以使用 withColumn 并传递 flagArray 来处理它而无需编写 UDF。
@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
n = int(chunkSize)
aggsum = []
final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n - 1) // n )]
for item in final:
agg = 0
for j in item:
agg += j
aggsum.append(agg)
return aggsum
我无法在 UDF 中使用以下表达式,因此我使用了循环
[sum(finalArray[x:x+2]) for x in range(0, len(finalArray), chunkSize)]
对于spark 2.4+,你可以试试sequence + transform:
from pyspark.sql.function import expr
df = spark.createDataFrame([
(1, [0, 2, 0, 3, 1, 4, 2, 7]),
(2, [0, 4, 4, 3, 4, 2, 2, 5])
], ["Index", "finalArray"])
df.withColumn("finalArray", expr("""
transform(
sequence(0,ceil(size(finalArray)/2)-1),
i -> finalArray[2*i] + ifnull(finalArray[2*i+1],0))
""")).show(truncate=False)
+-----+------------+
|Index|finalArray |
+-----+------------+
|1 |[2, 3, 5, 9]|
|2 |[4, 7, 6, 7]|
+-----+------------+
对于任意 N 的块大小,使用 aggregate 函数进行小计:
N = 3
sql_expr = """
transform(
/* create a sequence from 0 to number_of_chunks-1 */
sequence(0,ceil(size(finalArray)/{0})-1),
/* iterate the above sequence */
i ->
/* create a sequence from 0 to chunk_size-1
calculate the sum of values containing every chunk_size items by their indices
*/
aggregate(
sequence(0,{0}-1),
0L,
(acc, y) -> acc + ifnull(finalArray[i*{0}+y],0)
)
)
"""
df.withColumn("finalArray", expr(sql_expr.format(N))).show()
+-----+----------+
|Index|finalArray|
+-----+----------+
| 1| [2, 8, 9]|
| 2| [8, 9, 7]|
+-----+----------+
这是@jxc 解决方案的一个略有不同的版本,它使用 slice
函数以及 transform
和 aggregate
函数。
逻辑是对于数组的每个元素,我们检查其索引是否是 chunk size
的倍数,并使用 slice
获取 chunk size
的子数组。使用 aggregate
我们对每个子数组的元素求和。最后使用filter
去空(对应不满足i % chunk = 0
的索引。
chunk = 2
transform_expr = f"""
filter(transform(finalArray,
(x, i) -> IF (i % {chunk} = 0,
aggregate(slice(finalArray, i+1, {chunk}), 0L, (acc, y) -> acc + y),
null
)
),
x -> x is not null)
"""
df.withColumn("finalArray", expr(transform_expr)).show()
#+-----+------------+
#|Index| finalArray|
#+-----+------------+
#| 1|[2, 3, 5, 9]|
#| 2|[4, 7, 6, 7]|
#+-----+------------+