PySpark 和时间序列数据:如何巧妙地避免日期重叠?
PySpark and time series data: how to smartly avoid overlapping dates?
我有以下示例 Spark 数据帧
import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.sql.window import Window
raw_df = pd.DataFrame([
(1115, dt.datetime(2019,8,5,18,20), dt.datetime(2019,8,5,18,40)),
(484, dt.datetime(2019,8,5,18,30), dt.datetime(2019,8,9,18,40)),
(484, dt.datetime(2019,8,4,18,30), dt.datetime(2019,8,6,18,40)),
(484, dt.datetime(2019,8,2,18,30), dt.datetime(2019,8,3,18,40)),
(484, dt.datetime(2019,8,7,18,50), dt.datetime(2019,8,9,18,50)),
(1115, dt.datetime(2019,8,6,18,20), dt.datetime(2019,8,6,18,40)),
], columns=['server_id', 'start_time', 'end_time'])
df = spark.createDataFrame(raw_df)
这导致
+---------+-------------------+-------------------+
|server_id| start_time| end_time|
+---------+-------------------+-------------------+
| 1115|2019-08-05 18:20:00|2019-08-05 18:40:00|
| 484|2019-08-05 18:30:00|2019-08-09 18:40:00|
| 484|2019-08-04 18:30:00|2019-08-06 18:40:00|
| 484|2019-08-02 18:30:00|2019-08-03 18:40:00|
| 484|2019-08-07 18:50:00|2019-08-09 18:50:00|
| 1115|2019-08-06 18:20:00|2019-08-06 18:40:00|
+---------+-------------------+-------------------+
这表示每个服务器的使用日期范围。我想将其转换为非重叠日期的时间序列。
我想在不使用 UDF 的情况下实现此 。
这就是我现在做的,这是错误的
w = Window().orderBy(fn.lit('A'))
# Separate start/end date of usage into rows
df = (df.withColumn('start_end_time', fn.array('start_time', 'end_time'))
.withColumn('event_dt', fn.explode('start_end_time'))
.withColumn('row_num', fn.row_number().over(w)))
# Indicate start/end date of the usage (start date will always be on odd rows)
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
.select('server_id', 'event_dt', 'is_start'))
这给出了
+---------+-------------------+--------+
|server_id| event_dt|is_start|
+---------+-------------------+--------+
| 1115|2019-08-05 18:20:00| 1|
| 1115|2019-08-05 18:40:00| 0|
| 484|2019-08-05 18:30:00| 1|
| 484|2019-08-09 18:40:00| 0|
| 484|2019-08-04 18:30:00| 1|
| 484|2019-08-06 18:40:00| 0|
| 484|2019-08-02 18:30:00| 1|
| 484|2019-08-03 18:40:00| 0|
| 484|2019-08-07 18:50:00| 1|
| 484|2019-08-09 18:50:00| 0|
| 1115|2019-08-06 18:20:00| 1|
| 1115|2019-08-06 18:40:00| 0|
+---------+-------------------+--------+
但是我想达到的最终结果如下:
+---------+-------------------+--------+
|server_id| event_dt|is_start|
+---------+-------------------+--------+
| 1115|2019-08-05 18:20:00| 1|
| 1115|2019-08-05 18:40:00| 0|
| 1115|2019-08-06 18:20:00| 1|
| 1115|2019-08-06 18:40:00| 0|
| 484|2019-08-02 18:30:00| 1|
| 484|2019-08-03 18:40:00| 0|
| 484|2019-08-04 18:30:00| 1|
| 484|2019-08-09 18:50:00| 0|
+---------+-------------------+--------+
所以对于 server_id
484,我有实际的开始和结束日期,中间没有任何干扰。
对于如何在不使用 UDF 的情况下实现这一点,您有什么建议吗?
谢谢
IIUC,这是可以用Window lag(), sum()[=55=解决的问题之一] 函数为符合某些特定条件的有序连续行添加子组标签。类似于我们在 Pandas 中使用 shift()+cumsum().
所做的事情
设置 Window 规范 w1
:
w1 = Window.partitionBy('server_id').orderBy('start_time')
并计算以下内容:
- max('end_time'): 当前行之前的最大值
end_time
超过window-w1
- 滞后('end_time'):前一个
end_time
- sum('prev_end_time < current_start_time ? 1 : 0'): 标识子组的标志
以上三项可以对应Pandascummax(),shift()和cumsum().
通过用 max(end_time).over(w1)
更新 df.end_time 并设置子来计算 df1 -组标签g,然后做groupby(server_id, g)
计算min(start_time)
和max(end_time)
df1 = df.withColumn('end_time', fn.max('end_time').over(w1)) \
.withColumn('g', fn.sum(fn.when(fn.lag('end_time').over(w1) < fn.col('start_time'),1).otherwise(0)).over(w1)) \
.groupby('server_id', 'g') \
.agg(fn.min('start_time').alias('start_time'), fn.max('end_time').alias('end_time'))
df1.show()
+---------+---+-------------------+-------------------+
|server_id| g| start_time| end_time|
+---------+---+-------------------+-------------------+
| 1115| 0|2019-08-05 18:20:00|2019-08-05 18:40:00|
| 1115| 1|2019-08-06 18:20:00|2019-08-06 18:40:00|
| 484| 0|2019-08-02 18:30:00|2019-08-03 18:40:00|
| 484| 1|2019-08-04 18:30:00|2019-08-09 18:50:00|
+---------+---+-------------------+-------------------+
在我们有df1之后,我们可以使用两个选择拆分数据,然后合并结果集:
df_new = df1.selectExpr('server_id', 'start_time as event_dt', '1 as is_start').union(
df1.selectExpr('server_id', 'end_time as event_dt', '0 as is_start')
)
df_new.orderBy('server_id', 'event_dt').show()
+---------+-------------------+--------+
|server_id| event_dt|is_start|
+---------+-------------------+--------+
| 484|2019-08-02 18:30:00| 1|
| 484|2019-08-03 18:40:00| 0|
| 484|2019-08-04 18:30:00| 1|
| 484|2019-08-09 18:50:00| 0|
| 1115|2019-08-05 18:20:00| 1|
| 1115|2019-08-05 18:40:00| 0|
| 1115|2019-08-06 18:20:00| 1|
| 1115|2019-08-06 18:40:00| 0|
+---------+-------------------+--------+
我有以下示例 Spark 数据帧
import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.sql.window import Window
raw_df = pd.DataFrame([
(1115, dt.datetime(2019,8,5,18,20), dt.datetime(2019,8,5,18,40)),
(484, dt.datetime(2019,8,5,18,30), dt.datetime(2019,8,9,18,40)),
(484, dt.datetime(2019,8,4,18,30), dt.datetime(2019,8,6,18,40)),
(484, dt.datetime(2019,8,2,18,30), dt.datetime(2019,8,3,18,40)),
(484, dt.datetime(2019,8,7,18,50), dt.datetime(2019,8,9,18,50)),
(1115, dt.datetime(2019,8,6,18,20), dt.datetime(2019,8,6,18,40)),
], columns=['server_id', 'start_time', 'end_time'])
df = spark.createDataFrame(raw_df)
这导致
+---------+-------------------+-------------------+
|server_id| start_time| end_time|
+---------+-------------------+-------------------+
| 1115|2019-08-05 18:20:00|2019-08-05 18:40:00|
| 484|2019-08-05 18:30:00|2019-08-09 18:40:00|
| 484|2019-08-04 18:30:00|2019-08-06 18:40:00|
| 484|2019-08-02 18:30:00|2019-08-03 18:40:00|
| 484|2019-08-07 18:50:00|2019-08-09 18:50:00|
| 1115|2019-08-06 18:20:00|2019-08-06 18:40:00|
+---------+-------------------+-------------------+
这表示每个服务器的使用日期范围。我想将其转换为非重叠日期的时间序列。
我想在不使用 UDF 的情况下实现此 。
这就是我现在做的,这是错误的
w = Window().orderBy(fn.lit('A'))
# Separate start/end date of usage into rows
df = (df.withColumn('start_end_time', fn.array('start_time', 'end_time'))
.withColumn('event_dt', fn.explode('start_end_time'))
.withColumn('row_num', fn.row_number().over(w)))
# Indicate start/end date of the usage (start date will always be on odd rows)
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
.select('server_id', 'event_dt', 'is_start'))
这给出了
+---------+-------------------+--------+
|server_id| event_dt|is_start|
+---------+-------------------+--------+
| 1115|2019-08-05 18:20:00| 1|
| 1115|2019-08-05 18:40:00| 0|
| 484|2019-08-05 18:30:00| 1|
| 484|2019-08-09 18:40:00| 0|
| 484|2019-08-04 18:30:00| 1|
| 484|2019-08-06 18:40:00| 0|
| 484|2019-08-02 18:30:00| 1|
| 484|2019-08-03 18:40:00| 0|
| 484|2019-08-07 18:50:00| 1|
| 484|2019-08-09 18:50:00| 0|
| 1115|2019-08-06 18:20:00| 1|
| 1115|2019-08-06 18:40:00| 0|
+---------+-------------------+--------+
但是我想达到的最终结果如下:
+---------+-------------------+--------+
|server_id| event_dt|is_start|
+---------+-------------------+--------+
| 1115|2019-08-05 18:20:00| 1|
| 1115|2019-08-05 18:40:00| 0|
| 1115|2019-08-06 18:20:00| 1|
| 1115|2019-08-06 18:40:00| 0|
| 484|2019-08-02 18:30:00| 1|
| 484|2019-08-03 18:40:00| 0|
| 484|2019-08-04 18:30:00| 1|
| 484|2019-08-09 18:50:00| 0|
+---------+-------------------+--------+
所以对于 server_id
484,我有实际的开始和结束日期,中间没有任何干扰。
对于如何在不使用 UDF 的情况下实现这一点,您有什么建议吗?
谢谢
IIUC,这是可以用Window lag(), sum()[=55=解决的问题之一] 函数为符合某些特定条件的有序连续行添加子组标签。类似于我们在 Pandas 中使用 shift()+cumsum().
所做的事情设置 Window 规范
w1
:w1 = Window.partitionBy('server_id').orderBy('start_time')
并计算以下内容:
- max('end_time'): 当前行之前的最大值
end_time
超过window-w1
- 滞后('end_time'):前一个
end_time
- sum('prev_end_time < current_start_time ? 1 : 0'): 标识子组的标志
以上三项可以对应Pandascummax(),shift()和cumsum().
- max('end_time'): 当前行之前的最大值
通过用
max(end_time).over(w1)
更新 df.end_time 并设置子来计算 df1 -组标签g,然后做groupby(server_id, g)
计算min(start_time)
和max(end_time)
df1 = df.withColumn('end_time', fn.max('end_time').over(w1)) \ .withColumn('g', fn.sum(fn.when(fn.lag('end_time').over(w1) < fn.col('start_time'),1).otherwise(0)).over(w1)) \ .groupby('server_id', 'g') \ .agg(fn.min('start_time').alias('start_time'), fn.max('end_time').alias('end_time')) df1.show() +---------+---+-------------------+-------------------+ |server_id| g| start_time| end_time| +---------+---+-------------------+-------------------+ | 1115| 0|2019-08-05 18:20:00|2019-08-05 18:40:00| | 1115| 1|2019-08-06 18:20:00|2019-08-06 18:40:00| | 484| 0|2019-08-02 18:30:00|2019-08-03 18:40:00| | 484| 1|2019-08-04 18:30:00|2019-08-09 18:50:00| +---------+---+-------------------+-------------------+
在我们有df1之后,我们可以使用两个选择拆分数据,然后合并结果集:
df_new = df1.selectExpr('server_id', 'start_time as event_dt', '1 as is_start').union( df1.selectExpr('server_id', 'end_time as event_dt', '0 as is_start') ) df_new.orderBy('server_id', 'event_dt').show() +---------+-------------------+--------+ |server_id| event_dt|is_start| +---------+-------------------+--------+ | 484|2019-08-02 18:30:00| 1| | 484|2019-08-03 18:40:00| 0| | 484|2019-08-04 18:30:00| 1| | 484|2019-08-09 18:50:00| 0| | 1115|2019-08-05 18:20:00| 1| | 1115|2019-08-05 18:40:00| 0| | 1115|2019-08-06 18:20:00| 1| | 1115|2019-08-06 18:40:00| 0| +---------+-------------------+--------+