根据当前和过去的信息按时间步长汇总标签

Summarizing labels at time steps based on current and past info

给定以下输入数据帧

npos = 3

inp = spark.createDataFrame([
    ['1', 23, 0, 2],
    ['1', 45, 1, 2],
    ['1', 89, 1, 3],
    ['1', 95, 2, 2],
    ['1', 95, 0, 4],
    ['2', 20, 2, 2],
    ['2', 40, 1, 4],
  ], schema=["id","elap","pos","lbl"])

需要构建一个看起来像这样的数据框

out = spark.createDataFrame([
    ['1', 23, [2,0,0]],
    ['1', 45, [2,2,0]],
    ['1', 89, [2,3,0]],
    ['1', 95, [4,3,2]],
    ['2', 20, [0,0,2]],
    ['2', 40, [0,4,2]],
  ], schema=["id","elap","vec"])

输入数据框有数百万条记录。

上例中的一些细节(设计)

您可以在数组上使用一些高阶函数来实现:

  1. 使用 array_repeat 函数添加 vec 列并从 lbl
  2. 初始化 pos
  3. 使用 collect_list 得到 vec 超过 window 的累积 id
  4. aggregate 如果与 0
  5. 不同,则通过选择之前的位置得到结果数组
from pyspark.sql import Window
import pyspark.sql.functions as F

npos = 3

out = inp.withColumn(
    "vec",
    F.expr(f"transform(array_repeat(0, {npos}), (x, i) -> IF(i=pos, lbl, x))")
).withColumn(
    "vec",
    F.collect_list("vec").over(Window.partitionBy("id").orderBy("elap"))
).withColumn(
    "vec",
    F.expr(f"""aggregate(
                  vec, 
                  array_repeat(0, {npos}),
                  (acc, x) -> transform(acc, (y, i) -> int(IF(x[i]!=0, x[i], y)))
            )""")
).drop("lbl", "pos")

out.show(truncate=False)

#+---+----+---------+
#|id |elap|vec      |
#+---+----+---------+
#|1  |23  |[2, 0, 0]|
#|1  |45  |[2, 2, 0]|
#|1  |89  |[2, 3, 0]|
#|1  |95  |[4, 3, 2]|
#|1  |95  |[4, 3, 2]|
#|2  |20  |[0, 0, 2]|
#|2  |40  |[0, 4, 2]|
#+---+----+---------+