如何计算给定时间间隔 window 之间的行数

how to count number of rows between a given time interval window

我有包含 2 列事件时间(时间戳)和颜色(字符串)的数据框我想计算每秒之间的行数。

  event-time              color
  2019-08-01 00:00:00    orange
  2019-08-01 00:00:20    orange
  2019-08-01 00:00:44    yellow
  2019-08-01 00:01:00    pink
  2019-08-01 00:01:20    pink
  2019-08-01 00:02:00    black
      ....               ...
  2019-08-07 00:01:00    pink

我想要这样

    event-time            count
    2019-08-01 00:00:00   3
    2019-08-01 00:01:00   2
    2019-08-01 00:02:00   1
         ...              ...

我尝试使用 window 函数,但没有得到预期的输出。

您可以创建一个范围变量并使用它进行分组和计数。像下面这样的东西应该有所帮助

import pyspark.sql.functions as F

seconds = 1
seconds_window = F.from_unixtime(F.unix_timestamp('event-time')\
       - F.unix_timestamp('event-time') % seconds)
df = df.withColumn('1sec_window', seconds_window)

您可以在这里使用window功能。

首先创建DataFrame,如果event-timeStringType中,将其转换为TimestampType

df = df.withColumn('time', F.to_timestamp(df['event-time'], 'yyyy-MM-ddHH:mm:ss'))
df.show()

这是我们的 DataFrame:

+------------------+------+-------------------+
|        event-time| color|               time|
+------------------+------+-------------------+
|2019-08-0100:00:00|orange|2019-08-01 00:00:00|
|2019-08-0100:00:20|orange|2019-08-01 00:00:20|
|2019-08-0100:00:44|yellow|2019-08-01 00:00:44|
|2019-08-0100:01:00|  pink|2019-08-01 00:01:00|
|2019-08-0100:01:20|  pink|2019-08-01 00:01:20|
|2019-08-0100:02:00| black|2019-08-01 00:02:00|
+------------------+------+-------------------+

接下来,将 event-time 分组为 1 minute window,并使用 aggcount:

w = df.groupBy(F.window("time", "1 minute")).agg(F.count("event-time").alias("count"))
w.orderBy('window').show()
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "count").orderBy('start').show()

这是你最后得到的:

+--------------------+-----+
|              window|count|
+--------------------+-----+
|[2019-08-01 00:00...|    3|
|[2019-08-01 00:01...|    2|
|[2019-08-01 00:02...|    1|
+--------------------+-----+


+-------------------+-------------------+-----+
|              start|                end|count|
+-------------------+-------------------+-----+
|2019-08-01 00:00:00|2019-08-01 00:01:00|    3|
|2019-08-01 00:01:00|2019-08-01 00:02:00|    2|
|2019-08-01 00:02:00|2019-08-01 00:03:00|    1|
+-------------------+-------------------+-----+

您可以将 1 minute 替换为其他时间间隔,例如 1 second1 day 12 hours2 minutes

the documentation here

IIUC,你想按分钟对事件时间进行分组,你可以试试pyspark.sql.functions.date_trunc (spark 2.3+)

>>> from pyspark.sql.functions import date_trunc, to_timestamp

>>> df.show()                                                                                                                   
+-------------------+------+
|         event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:20|orange|
|2019-08-01 00:00:44|yellow|
|2019-08-01 00:01:00|  pink|
|2019-08-01 00:01:20|  pink|
|2019-08-01 00:02:00| black|
+-------------------+------+

>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))).show()                                    
+-------------------+------+
|         event-time| color|
+-------------------+------+
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|orange|
|2019-08-01 00:00:00|yellow|
|2019-08-01 00:01:00|  pink|
|2019-08-01 00:01:00|  pink|
|2019-08-01 00:02:00| black|
+-------------------+------+

然后对更新后的 event-time 进行分组并计算行数:

>>> df.withColumn('event-time', date_trunc('minute', to_timestamp('event-time'))) \
  .groupBy('event-time') \
  .count() \
  .show()     
+-------------------+-----+                                                     
|         event-time|count|
+-------------------+-----+
|2019-08-01 00:01:00|    2|
|2019-08-01 00:00:00|    3|
|2019-08-01 00:02:00|    1|
+-------------------+-----+

注:如果event-time已经是一个TimestampType,则跳过函数to_timestamp() 并直接使用 event-time 字段。