如何计算给定时间间隔 window 之间的行数
how to count number of rows between a given time interval window
我有包含 2 列事件时间(时间戳)和颜色(字符串)的数据框我想计算每秒之间的行数。
event-time color
2019-08-01 00:00:00 orange
2019-08-01 00:00:20 orange
2019-08-01 00:00:44 yellow
2019-08-01 00:01:00 pink
2019-08-01 00:01:20 pink
2019-08-01 00:02:00 black
.... ...
2019-08-07 00:01:00 pink
我想要这样
event-time count
2019-08-01 00:00:00 3
2019-08-01 00:01:00 2
2019-08-01 00:02:00 1
... ...
我尝试使用 window 函数,但没有得到预期的输出。
您可以创建一个范围变量并使用它进行分组和计数。像下面这样的东西应该有所帮助
import pyspark.sql.functions as F
seconds = 1
seconds_window = F.from_unixtime(F.unix_timestamp('event-time')\
- F.unix_timestamp('event-time') % seconds)
df = df.withColumn('1sec_window', seconds_window)
您可以在这里使用window
功能。
首先创建DataFrame,如果event-time
在StringType
中,将其转换为TimestampType
。
df = df.withColumn('time', F.to_timestamp(df['event-time'], 'yyyy-MM-ddHH:mm:ss'))
df.show()
这是我们的 DataFrame:
+------------------+------+-------------------+
| event-time| color| time|
+------------------+------+-------------------+
|2019-08-0100:00:00|orange|2019-08-01 00:00:00|
|2019-08-0100:00:20|orange|2019-08-01 00:00:20|
|2019-08-0100:00:44|yellow|2019-08-01 00:00:44|
|2019-08-0100:01:00| pink|2019-08-01 00:01:00|
|2019-08-0100:01:20| pink|2019-08-01 00:01:20|
|2019-08-0100:02:00| black|2019-08-01 00:02:00|
+------------------+------+-------------------+
接下来,将 event-time
分组为 1 minute
window,并使用 agg
到 count
:
w = df.groupBy(F.window("time", "1 minute")).agg(F.count("event-time").alias("count"))
w.orderBy('window').show()
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "count").orderBy('start').show()
这是你最后得到的:
+--------------------+-----+
| window|count|
+--------------------+-----+
|[2019-08-01 00:00...| 3|
|[2019-08-01 00:01...| 2|
|[2019-08-01 00:02...| 1|
+--------------------+-----+
+-------------------+-------------------+-----+
| start| end|count|
+-------------------+-------------------+-----+
|2019-08-01 00:00:00|2019-08-01 00:01:00| 3|
|2019-08-01 00:01:00|2019-08-01 00:02:00| 2|
|2019-08-01 00:02:00|2019-08-01 00:03:00| 1|
+-------------------+-------------------+-----+
您可以将 1 minute
替换为其他时间间隔,例如
1 second
、1 day 12 hours
、2 minutes
等
IIUC,你想按分钟对事件时间进行分组,你可以试试pyspark.sql.functions.date_trunc (spark 2.3+)
>>> from pyspark.sql.functions import date_trunc, to_timestamp
>>> df.show()
+-------------------+------+
| event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:20|orange|
|2019-08-01 00:00:44|yellow|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:01:20| pink|
|2019-08-01 00:02:00| black|
+-------------------+------+
>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))).show()
+-------------------+------+
| event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|yellow|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:02:00| black|
+-------------------+------+
然后对更新后的 event-time
进行分组并计算行数:
>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))) \
.groupBy('event-time') \
.count() \
.show()
+-------------------+-----+
| event-time|count|
+-------------------+-----+
|2019-08-01 00:01:00| 2|
|2019-08-01 00:00:00| 3|
|2019-08-01 00:02:00| 1|
+-------------------+-----+
注:如果event-time
已经是一个TimestampType
,则跳过函数to_timestamp() 并直接使用 event-time
字段。
我有包含 2 列事件时间(时间戳)和颜色(字符串)的数据框我想计算每秒之间的行数。
event-time color
2019-08-01 00:00:00 orange
2019-08-01 00:00:20 orange
2019-08-01 00:00:44 yellow
2019-08-01 00:01:00 pink
2019-08-01 00:01:20 pink
2019-08-01 00:02:00 black
.... ...
2019-08-07 00:01:00 pink
我想要这样
event-time count
2019-08-01 00:00:00 3
2019-08-01 00:01:00 2
2019-08-01 00:02:00 1
... ...
我尝试使用 window 函数,但没有得到预期的输出。
您可以创建一个范围变量并使用它进行分组和计数。像下面这样的东西应该有所帮助
import pyspark.sql.functions as F
seconds = 1
seconds_window = F.from_unixtime(F.unix_timestamp('event-time')\
- F.unix_timestamp('event-time') % seconds)
df = df.withColumn('1sec_window', seconds_window)
您可以在这里使用window
功能。
首先创建DataFrame,如果event-time
在StringType
中,将其转换为TimestampType
。
df = df.withColumn('time', F.to_timestamp(df['event-time'], 'yyyy-MM-ddHH:mm:ss'))
df.show()
这是我们的 DataFrame:
+------------------+------+-------------------+
| event-time| color| time|
+------------------+------+-------------------+
|2019-08-0100:00:00|orange|2019-08-01 00:00:00|
|2019-08-0100:00:20|orange|2019-08-01 00:00:20|
|2019-08-0100:00:44|yellow|2019-08-01 00:00:44|
|2019-08-0100:01:00| pink|2019-08-01 00:01:00|
|2019-08-0100:01:20| pink|2019-08-01 00:01:20|
|2019-08-0100:02:00| black|2019-08-01 00:02:00|
+------------------+------+-------------------+
接下来,将 event-time
分组为 1 minute
window,并使用 agg
到 count
:
w = df.groupBy(F.window("time", "1 minute")).agg(F.count("event-time").alias("count"))
w.orderBy('window').show()
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "count").orderBy('start').show()
这是你最后得到的:
+--------------------+-----+
| window|count|
+--------------------+-----+
|[2019-08-01 00:00...| 3|
|[2019-08-01 00:01...| 2|
|[2019-08-01 00:02...| 1|
+--------------------+-----+
+-------------------+-------------------+-----+
| start| end|count|
+-------------------+-------------------+-----+
|2019-08-01 00:00:00|2019-08-01 00:01:00| 3|
|2019-08-01 00:01:00|2019-08-01 00:02:00| 2|
|2019-08-01 00:02:00|2019-08-01 00:03:00| 1|
+-------------------+-------------------+-----+
您可以将 1 minute
替换为其他时间间隔,例如
1 second
、1 day 12 hours
、2 minutes
等
IIUC,你想按分钟对事件时间进行分组,你可以试试pyspark.sql.functions.date_trunc (spark 2.3+)
>>> from pyspark.sql.functions import date_trunc, to_timestamp
>>> df.show()
+-------------------+------+
| event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:20|orange|
|2019-08-01 00:00:44|yellow|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:01:20| pink|
|2019-08-01 00:02:00| black|
+-------------------+------+
>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))).show()
+-------------------+------+
| event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|yellow|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:01:00| pink|
|2019-08-01 00:02:00| black|
+-------------------+------+
然后对更新后的 event-time
进行分组并计算行数:
>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))) \
.groupBy('event-time') \
.count() \
.show()
+-------------------+-----+
| event-time|count|
+-------------------+-----+
|2019-08-01 00:01:00| 2|
|2019-08-01 00:00:00| 3|
|2019-08-01 00:02:00| 1|
+-------------------+-----+
注:如果event-time
已经是一个TimestampType
,则跳过函数to_timestamp() 并直接使用 event-time
字段。