如何在pyspark中将数据分成几组
How to split data into groups in pyspark
我需要在时间序列数据中查找组。
数据样本
我需要根据 value
和 day
输出列 group
。
我尝试过使用滞后、超前和 row_number,但最终一无所获。
您似乎想在每次值更改时递增该组。如果是这样,这就是一种间隙和孤岛问题。
这是一种使用 lag()
和累积 sum()
的方法:
select
value,
day,
sum(case when value = lag_value then 0 else 1 end) over(order by day) grp
from (
select t.*, lag(value) over(order by day) lag_value
from mytable t
) t
PySpark
这样做的方法。使用 lag
查找组的端点,在此 lag
上执行 incremental sum
得到 groups
, add 1
到组得到你的 desired groups.
from pypsark.sql.window import Window
from pyspark.sql import functions as F
w1=Window().orderBy("day")
df.withColumn("lag", F.when(F.lag("value").over(w1)!=F.col("value"), F.lit(1)).otherwise(F.lit(0)))\
.withColumn("group", F.sum("lag").over(w1) + 1).drop("lag").show()
#+-----+---+-----+
#|value|day|group|
#+-----+---+-----+
#| 1| 1| 1|
#| 1| 2| 1|
#| 1| 3| 1|
#| 1| 4| 1|
#| 1| 5| 1|
#| 2| 6| 2|
#| 2| 7| 2|
#| 1| 8| 3|
#| 1| 9| 3|
#| 1| 10| 3|
#| 1| 11| 3|
#| 1| 12| 3|
#| 1| 13| 3|
#+-----+---+-----+
我需要在时间序列数据中查找组。
数据样本
我需要根据 value
和 day
输出列 group
。
我尝试过使用滞后、超前和 row_number,但最终一无所获。
您似乎想在每次值更改时递增该组。如果是这样,这就是一种间隙和孤岛问题。
这是一种使用 lag()
和累积 sum()
的方法:
select
value,
day,
sum(case when value = lag_value then 0 else 1 end) over(order by day) grp
from (
select t.*, lag(value) over(order by day) lag_value
from mytable t
) t
PySpark
这样做的方法。使用 lag
查找组的端点,在此 lag
上执行 incremental sum
得到 groups
, add 1
到组得到你的 desired groups.
from pypsark.sql.window import Window
from pyspark.sql import functions as F
w1=Window().orderBy("day")
df.withColumn("lag", F.when(F.lag("value").over(w1)!=F.col("value"), F.lit(1)).otherwise(F.lit(0)))\
.withColumn("group", F.sum("lag").over(w1) + 1).drop("lag").show()
#+-----+---+-----+
#|value|day|group|
#+-----+---+-----+
#| 1| 1| 1|
#| 1| 2| 1|
#| 1| 3| 1|
#| 1| 4| 1|
#| 1| 5| 1|
#| 2| 6| 2|
#| 2| 7| 2|
#| 1| 8| 3|
#| 1| 9| 3|
#| 1| 10| 3|
#| 1| 11| 3|
#| 1| 12| 3|
#| 1| 13| 3|
#+-----+---+-----+