Pyspark 组按时间跨度
Pyspark GroupBy time span
我有包含开始和结束日期的数据,例如
+---+----------+------------+
| id| start| end|
+---+----------+------------+
| 1|2021-05-01| 2022-02-01|
| 2|2021-10-01| 2021-12-01|
| 3|2021-11-01| 2022-01-01|
| 4|2021-06-01| 2021-10-01|
| 5|2022-01-01| 2022-02-01|
| 6|2021-08-01| 2021-12-01|
+---+----------+------------+
我想统计每个月有多少观察是“活跃”的,以便在图中显示。对于 active,我的意思是我想要计算有多少观测值具有包含给定月份的开始和结束日期。示例数据的结果应如下所示:
Example of a plot for the active times
我查看了 pyspark Window 函数,但我认为这不能帮助我解决问题。到目前为止,我唯一的想法是为数据中的每个月指定一个额外的列,并指示观察在该月是否处于活动状态并从那里开始工作。但我觉得一定有更有效的方法来做到这一点。
您可以使用 sequence
SQL。 sequence
将创建包含开始、结束和间隔的日期范围以及 return 列表。
然后,您可以使用explode
将列表展平,然后进行计数。
from pyspark.sql import functions as F
# Make sure your spark session is set to UTC.
# This SQL won't work well with a month interval if timezone is set to a place that has a daylight saving.
spark = (SparkSession
.builder
.config('spark.sql.session.timeZone', 'UTC')
... # other config
.getOrCreate())
df = (df.withColumn('range', F.expr('sequence(to_date(`start`), to_date(`end`), interval 1 month) as date'))
.withColumn('observation', F.explode('range')))
df = df.groupby('observation').count()
我有包含开始和结束日期的数据,例如
+---+----------+------------+
| id| start| end|
+---+----------+------------+
| 1|2021-05-01| 2022-02-01|
| 2|2021-10-01| 2021-12-01|
| 3|2021-11-01| 2022-01-01|
| 4|2021-06-01| 2021-10-01|
| 5|2022-01-01| 2022-02-01|
| 6|2021-08-01| 2021-12-01|
+---+----------+------------+
我想统计每个月有多少观察是“活跃”的,以便在图中显示。对于 active,我的意思是我想要计算有多少观测值具有包含给定月份的开始和结束日期。示例数据的结果应如下所示: Example of a plot for the active times
我查看了 pyspark Window 函数,但我认为这不能帮助我解决问题。到目前为止,我唯一的想法是为数据中的每个月指定一个额外的列,并指示观察在该月是否处于活动状态并从那里开始工作。但我觉得一定有更有效的方法来做到这一点。
您可以使用 sequence
SQL。 sequence
将创建包含开始、结束和间隔的日期范围以及 return 列表。
然后,您可以使用explode
将列表展平,然后进行计数。
from pyspark.sql import functions as F
# Make sure your spark session is set to UTC.
# This SQL won't work well with a month interval if timezone is set to a place that has a daylight saving.
spark = (SparkSession
.builder
.config('spark.sql.session.timeZone', 'UTC')
... # other config
.getOrCreate())
df = (df.withColumn('range', F.expr('sequence(to_date(`start`), to_date(`end`), interval 1 month) as date'))
.withColumn('observation', F.explode('range')))
df = df.groupby('observation').count()