根据当前和过去的信息按时间步长汇总标签
Summarizing labels at time steps based on current and past info
给定以下输入数据帧
npos = 3
inp = spark.createDataFrame([
['1', 23, 0, 2],
['1', 45, 1, 2],
['1', 89, 1, 3],
['1', 95, 2, 2],
['1', 95, 0, 4],
['2', 20, 2, 2],
['2', 40, 1, 4],
], schema=["id","elap","pos","lbl"])
需要构建一个看起来像这样的数据框
out = spark.createDataFrame([
['1', 23, [2,0,0]],
['1', 45, [2,2,0]],
['1', 89, [2,3,0]],
['1', 95, [4,3,2]],
['2', 20, [0,0,2]],
['2', 40, [0,4,2]],
], schema=["id","elap","vec"])
输入数据框有数百万条记录。
上例中的一些细节(设计)
npos
是输出中要构造的向量的大小
pos
保证在 [0,npos)
- 在每个时间步 (
elap
) 对于 pos
最多会有 1 label
- 如果
lbl
未在某个时间步给出,则必须从上次为该 pos
指定的时间推断
- 如果之前没有指定
lbl
,可以假定为0
您可以在数组上使用一些高阶函数来实现:
- 使用
array_repeat
函数添加 vec
列并从 lbl
初始化 pos
值
- 使用
collect_list
得到 vec
超过 window 的累积 id
aggregate
如果与 0
不同,则通过选择之前的位置得到结果数组
from pyspark.sql import Window
import pyspark.sql.functions as F
npos = 3
out = inp.withColumn(
"vec",
F.expr(f"transform(array_repeat(0, {npos}), (x, i) -> IF(i=pos, lbl, x))")
).withColumn(
"vec",
F.collect_list("vec").over(Window.partitionBy("id").orderBy("elap"))
).withColumn(
"vec",
F.expr(f"""aggregate(
vec,
array_repeat(0, {npos}),
(acc, x) -> transform(acc, (y, i) -> int(IF(x[i]!=0, x[i], y)))
)""")
).drop("lbl", "pos")
out.show(truncate=False)
#+---+----+---------+
#|id |elap|vec |
#+---+----+---------+
#|1 |23 |[2, 0, 0]|
#|1 |45 |[2, 2, 0]|
#|1 |89 |[2, 3, 0]|
#|1 |95 |[4, 3, 2]|
#|1 |95 |[4, 3, 2]|
#|2 |20 |[0, 0, 2]|
#|2 |40 |[0, 4, 2]|
#+---+----+---------+
给定以下输入数据帧
npos = 3
inp = spark.createDataFrame([
['1', 23, 0, 2],
['1', 45, 1, 2],
['1', 89, 1, 3],
['1', 95, 2, 2],
['1', 95, 0, 4],
['2', 20, 2, 2],
['2', 40, 1, 4],
], schema=["id","elap","pos","lbl"])
需要构建一个看起来像这样的数据框
out = spark.createDataFrame([
['1', 23, [2,0,0]],
['1', 45, [2,2,0]],
['1', 89, [2,3,0]],
['1', 95, [4,3,2]],
['2', 20, [0,0,2]],
['2', 40, [0,4,2]],
], schema=["id","elap","vec"])
输入数据框有数百万条记录。
上例中的一些细节(设计)
npos
是输出中要构造的向量的大小pos
保证在[0,npos)
- 在每个时间步 (
elap
) 对于pos
最多会有 1 - 如果
lbl
未在某个时间步给出,则必须从上次为该pos
指定的时间推断
- 如果之前没有指定
lbl
,可以假定为0
label
您可以在数组上使用一些高阶函数来实现:
- 使用
array_repeat
函数添加vec
列并从lbl
初始化 - 使用
collect_list
得到vec
超过 window 的累积id
aggregate
如果与0
不同,则通过选择之前的位置得到结果数组
pos
值
from pyspark.sql import Window
import pyspark.sql.functions as F
npos = 3
out = inp.withColumn(
"vec",
F.expr(f"transform(array_repeat(0, {npos}), (x, i) -> IF(i=pos, lbl, x))")
).withColumn(
"vec",
F.collect_list("vec").over(Window.partitionBy("id").orderBy("elap"))
).withColumn(
"vec",
F.expr(f"""aggregate(
vec,
array_repeat(0, {npos}),
(acc, x) -> transform(acc, (y, i) -> int(IF(x[i]!=0, x[i], y)))
)""")
).drop("lbl", "pos")
out.show(truncate=False)
#+---+----+---------+
#|id |elap|vec |
#+---+----+---------+
#|1 |23 |[2, 0, 0]|
#|1 |45 |[2, 2, 0]|
#|1 |89 |[2, 3, 0]|
#|1 |95 |[4, 3, 2]|
#|1 |95 |[4, 3, 2]|
#|2 |20 |[0, 0, 2]|
#|2 |40 |[0, 4, 2]|
#+---+----+---------+