以连续的方式在 window 上应用 groupby pyspark
apply groupby over window in a continuous manner pyspark
我想应用 60 minutes
时间 window 的 groupby,但它只收集它出现的那一小时的值,并且不会显示 window 的任何内容没有价值。
我想要它的方式是 window 没有任何值它给出 0
以便以更连续的方式获得数据。
例如:
df = sc.parallelize(
[Row(datetime='2015/01/01 03:00:36', value = 2.0),
Row(datetime='2015/01/01 03:40:12', value = 3.0),
Row(datetime='2015/01/01 05:25:30', value = 1.0)]).toDF()
df1 = df.select(sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.groupBy(sf.window(sf.col("timestamp"), "60 minutes")).agg(sf.sum("value")).show(truncate = False)
我得到的输出是:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
然而,我希望输出为:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
编辑:
然后我如何将它扩展到双 groupby 并且每个“名称”的 windows 数量相等:
df = sc.parallelize(
[Row(name = 'ABC', datetime = '2015/01/01 03:00:36', value = 2.0),
Row(name = 'ABC', datetime = '2015/01/01 03:40:12', value = 3.0),
Row(name = 'ABC', datetime = '2015/01/01 05:25:30', value = 1.0),
Row(name = 'XYZ', datetime = '2015/01/01 05:15:30', value = 2.0)]).toDF()
df1 = df.select('name', sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.show(truncate = False)
>>>+----+-------------------+-----+
|name|timestamp |value|
+----+-------------------+-----+
|ABC |2015-01-01 03:00:36|2.0 |
|ABC |2015-01-01 03:40:12|3.0 |
|ABC |2015-01-01 05:25:30|1.0 |
|XYZ |2015-01-01 05:15:30|2.0 |
+----+-------------------+-----+
我希望结果是:
+----+------------------------------------------+----------+
|name|window |sum(value)|
+----+------------------------------------------+----------+
|ABC |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|ABC |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|ABC |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
|XYZ |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|0.0 |
|XYZ |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|XYZ |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|2.0 |
+----+------------------------------------------+----------+
这实际上是按 window
分组的行为,因为您在第 4 小时和第 5 小时之间没有对应的行。
但是,您可以通过使用 sequence
函数在单独的数据框中生成间隔并从 min(timestamp)
到 max(timestamp)
截断为小时来使其工作。然后,在生成的序列上使用 transfrom
函数来创建每个桶的 strat 和结束时间的结构:
from pyspark.sql import functions as sf
buckets = df1.agg(
sf.expr("""transform(
sequence(date_trunc('hour', min(timestamp)),
date_trunc('hour', max(timestamp)),
interval 1 hour
),
x -> struct(x as start, x + interval 1 hour as end)
)
""").alias("buckets")
).select(sf.explode("buckets").alias("window"))
buckets.show(truncate=False)
#+------------------------------------------+
#|window |
#+------------------------------------------+
#|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|
#|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|
#|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|
#+------------------------------------------+
现在,您加入原始数据框和 groupby window
列以求和 value
:
df2 = buckets.join(
df1,
(sf.col("timestamp") >= sf.col("window.start")) &
(sf.col("timestamp") < sf.col("window.end")),
"left"
).groupBy("window").agg(
sf.sum(sf.coalesce(sf.col("value"), sf.lit(0))).alias("sum")
)
df2.show(truncate=False)
#+------------------------------------------+---+
#|window |sum|
#+------------------------------------------+---+
#|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0|
#|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0|
#|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0|
#+------------------------------------------+---+
我想应用 60 minutes
时间 window 的 groupby,但它只收集它出现的那一小时的值,并且不会显示 window 的任何内容没有价值。
我想要它的方式是 window 没有任何值它给出 0
以便以更连续的方式获得数据。
例如:
df = sc.parallelize(
[Row(datetime='2015/01/01 03:00:36', value = 2.0),
Row(datetime='2015/01/01 03:40:12', value = 3.0),
Row(datetime='2015/01/01 05:25:30', value = 1.0)]).toDF()
df1 = df.select(sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.groupBy(sf.window(sf.col("timestamp"), "60 minutes")).agg(sf.sum("value")).show(truncate = False)
我得到的输出是:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
然而,我希望输出为:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
编辑:
然后我如何将它扩展到双 groupby 并且每个“名称”的 windows 数量相等:
df = sc.parallelize(
[Row(name = 'ABC', datetime = '2015/01/01 03:00:36', value = 2.0),
Row(name = 'ABC', datetime = '2015/01/01 03:40:12', value = 3.0),
Row(name = 'ABC', datetime = '2015/01/01 05:25:30', value = 1.0),
Row(name = 'XYZ', datetime = '2015/01/01 05:15:30', value = 2.0)]).toDF()
df1 = df.select('name', sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.show(truncate = False)
>>>+----+-------------------+-----+
|name|timestamp |value|
+----+-------------------+-----+
|ABC |2015-01-01 03:00:36|2.0 |
|ABC |2015-01-01 03:40:12|3.0 |
|ABC |2015-01-01 05:25:30|1.0 |
|XYZ |2015-01-01 05:15:30|2.0 |
+----+-------------------+-----+
我希望结果是:
+----+------------------------------------------+----------+
|name|window |sum(value)|
+----+------------------------------------------+----------+
|ABC |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|ABC |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|ABC |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
|XYZ |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|0.0 |
|XYZ |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|XYZ |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|2.0 |
+----+------------------------------------------+----------+
这实际上是按 window
分组的行为,因为您在第 4 小时和第 5 小时之间没有对应的行。
但是,您可以通过使用 sequence
函数在单独的数据框中生成间隔并从 min(timestamp)
到 max(timestamp)
截断为小时来使其工作。然后,在生成的序列上使用 transfrom
函数来创建每个桶的 strat 和结束时间的结构:
from pyspark.sql import functions as sf
buckets = df1.agg(
sf.expr("""transform(
sequence(date_trunc('hour', min(timestamp)),
date_trunc('hour', max(timestamp)),
interval 1 hour
),
x -> struct(x as start, x + interval 1 hour as end)
)
""").alias("buckets")
).select(sf.explode("buckets").alias("window"))
buckets.show(truncate=False)
#+------------------------------------------+
#|window |
#+------------------------------------------+
#|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|
#|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|
#|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|
#+------------------------------------------+
现在,您加入原始数据框和 groupby window
列以求和 value
:
df2 = buckets.join(
df1,
(sf.col("timestamp") >= sf.col("window.start")) &
(sf.col("timestamp") < sf.col("window.end")),
"left"
).groupBy("window").agg(
sf.sum(sf.coalesce(sf.col("value"), sf.lit(0))).alias("sum")
)
df2.show(truncate=False)
#+------------------------------------------+---+
#|window |sum|
#+------------------------------------------+---+
#|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0|
#|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0|
#|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0|
#+------------------------------------------+---+