如何计算满足最后一个条件之间的天数?
How to calculate days between when last condition was met?
当前 df:
df = spark.createDataFrame([
("2020-01-12","d1",0),
("2020-01-12","d2",0),
("2020-01-13","d3",0),
("2020-01-14","d4",1),
("2020-01-15","d5",0),
("2020-01-15","d6",0),
("2020-01-16","d7",0),
("2020-01-17","d8",0),
("2020-01-18","d9",1),
("2020-01-19","d10",0),
("2020-01-20","d11",0),],
['date', 'device', 'condition'])
df.show()
+----------+------+---------+
| date|device|condition|
+----------+------+---------+
|2020-01-12| d1| 0|
|2020-01-12| d2| 0|
|2020-01-13| d3| 0|
|2020-01-14| d4| 1|
|2020-01-15| d5| 0|
|2020-01-15| d6| 0|
|2020-01-16| d7| 0|
|2020-01-17| d8| 0|
|2020-01-18| d9| 1|
|2020-01-19| d10| 0|
|2020-01-20| d11| 0|
+----------+------+---------+
期望的输出 df:
want_df = spark.createDataFrame([
("2020-01-12","d1",0,0),
("2020-01-12","d2",0,0),
("2020-01-13","d3",0,1),
("2020-01-14","d4",1,2),
("2020-01-15","d5",0,1),
("2020-01-15","d6",0,1),
("2020-01-16","d7",0,2),
("2020-01-17","d8",0,3),
("2020-01-18","d9",1,4),
("2020-01-19","d10",0,1),
("2020-01-20","d11",0,2),],
['date', 'device', 'condition', 'life'])
want_df.show()
+----------+------+---------+----+
| date|device|condition|life|
+----------+------+---------+----+
|2020-01-12| d1| 0| 0|
|2020-01-12| d2| 0| 0|
|2020-01-13| d3| 0| 1|
|2020-01-14| d4| 1| 2|
|2020-01-15| d5| 0| 1|
|2020-01-15| d6| 0| 1|
|2020-01-16| d7| 0| 2|
|2020-01-17| d8| 0| 3|
|2020-01-18| d9| 1| 4|
|2020-01-19| d10| 0| 1|
|2020-01-20| d11| 0| 2|
+----------+------+---------+----+
Objective 是计算到 condition=1
时的日期差异(天数),然后日期差异重置为从满足最后一个条件时开始的天数。 life
是试图计算的列。知道如何计算吗? Window
还是 lag
?
这是一类问题,可以通过添加一些临时行来简化(我们标记它们,然后稍后删除它们)
from pyspark.sql import Window
from pyspark.sql.functions import lit, lag, sum as fsum, first, datediff
(1) 首先,创建一个新的数据帧 df1,它复制条件 == 1 的所有行,但设置它们的条件 = 0 和标志 = 1,将生成的数据帧与原始数据帧合并(设置标志 = 0):
df1 = df.withColumn('flag', lit(0)).union(
df.where('condition = 1').withColumn('condition', lit(0)).withColumn('flag', lit(1))
)
(2) 然后,设置下面两个Window Specs,使用w1
帮助创建子组标签 g
将所有连续的行分组,直到条件切换从 1 到 0。将 flag
添加到 orderBy() 中,以便新添加的行位于条件 = 1 的相应行的后面,并分组到下一个组标签中。
w1 = Window.partitionBy(lit(0)).orderBy('date', 'flag')
w2 = Window.partitionBy(lit(0), 'g').orderBy('date', 'flag')
注意: 如果你有一个巨大的数据框,你可能想要将 lit(0)
更改为一些实际的或计算的列以避免 Spark 将所有行移动到一个单一的划分。 更新: 根据评论,数据帧是一个单一的时间序列,可以加载到一个分区上,因此使用 lit(0)
应该足够了。
(3) 在 w1
上使用 lag 和 sum 函数找到子组标签 'g' 然后计算同一组中的 first_date 使用 Window规格 w2
。此日期用于计算列 'life':
df2 = df1.withColumn('g', fsum((lag('condition').over(w1) == 1).astype('int')).over(w1)) \
.withColumn('first_date', first('date').over(w2)) \
.withColumn('life', datediff('date','first_date'))
df2.show()
+----------+------+---------+----+---+----------+----+
| date|device|condition|flag| g|first_date|life|
+----------+------+---------+----+---+----------+----+
|2020-01-12| d1| 0| 0| 0|2020-01-12| 0|
|2020-01-12| d2| 0| 0| 0|2020-01-12| 0|
|2020-01-13| d3| 0| 0| 0|2020-01-12| 1|
|2020-01-14| d4| 1| 0| 0|2020-01-12| 2|
|2020-01-14| d4| 0| 1| 1|2020-01-14| 0|
|2020-01-15| d5| 0| 0| 1|2020-01-14| 1|
|2020-01-15| d6| 0| 0| 1|2020-01-14| 1|
|2020-01-16| d7| 0| 0| 1|2020-01-14| 2|
|2020-01-17| d8| 0| 0| 1|2020-01-14| 3|
|2020-01-18| d9| 1| 0| 1|2020-01-14| 4|
|2020-01-18| d9| 0| 1| 2|2020-01-18| 0|
|2020-01-19| d10| 0| 0| 2|2020-01-18| 1|
|2020-01-20| d11| 0| 0| 2|2020-01-18| 2|
+----------+------+---------+----+---+----------+----+
(4) 删除临时行和列以获得最终数据帧:
df_new = df2.filter('flag = 0').drop('first_date', 'g', 'flag')
df_new.show()
+----------+------+---------+----+
| date|device|condition|life|
+----------+------+---------+----+
|2020-01-12| d1| 0| 0|
|2020-01-12| d2| 0| 0|
|2020-01-13| d3| 0| 1|
|2020-01-14| d4| 1| 2|
|2020-01-15| d5| 0| 1|
|2020-01-15| d6| 0| 1|
|2020-01-16| d7| 0| 2|
|2020-01-17| d8| 0| 3|
|2020-01-18| d9| 1| 4|
|2020-01-19| d10| 0| 1|
|2020-01-20| d11| 0| 2|
+----------+------+---------+----+
我尝试从不同的方式提供,更接近标准 sql 方言,但仍然使用 pyspark 语法并关注性能影响。
from pyspark.sql import Window
from pyspark.sql.functions import col, when, lit, lag, min, max, datediff
Select条件等于1的日期范围,
然后使用联合函数结合最大日期值。
w = Window.partitionBy('date')
dateRange = df.select(df.date).where(df.condition == 1)\
.union(df.select(max(df.date))).distinct()\
.orderBy('date')\
.withColumn('lastDate', lag(col('date').over(w))\
.select(when(col('lastDate').isNull(), lit('1970-01-01')).otherwise(col('lastDate')).alias('lastDate'), col('date').alias('toDate'))
Select 日期范围和第一个最小日期通过将 df 加入日期范围,
然后进行分组并计算最小日期值。
dateRange1st = df.join(dateRange, df.date > dateRange.lastDate & df.date <= dateRange.toDate, 'inner').groupBy(dateRange.lastDate, dateRange.toDate).agg(min(df.date).alias('frDate'))
Select 将日期范围 (1st) 加入 df 的结果,
寻求帮助日期过滤并找出不同之处。
result = df.join(dateRange1st, df.date.between(dateRange1st.frDate, dateRange1st.toDate), 'inner')\
.select(df.date, df.device, df.condition)\
.withColumn('life', datediff(df.date - dataRange1st.frDate))\
.orderBy(df.date)
result.show()
希望对您有所帮助!
当前 df:
df = spark.createDataFrame([
("2020-01-12","d1",0),
("2020-01-12","d2",0),
("2020-01-13","d3",0),
("2020-01-14","d4",1),
("2020-01-15","d5",0),
("2020-01-15","d6",0),
("2020-01-16","d7",0),
("2020-01-17","d8",0),
("2020-01-18","d9",1),
("2020-01-19","d10",0),
("2020-01-20","d11",0),],
['date', 'device', 'condition'])
df.show()
+----------+------+---------+
| date|device|condition|
+----------+------+---------+
|2020-01-12| d1| 0|
|2020-01-12| d2| 0|
|2020-01-13| d3| 0|
|2020-01-14| d4| 1|
|2020-01-15| d5| 0|
|2020-01-15| d6| 0|
|2020-01-16| d7| 0|
|2020-01-17| d8| 0|
|2020-01-18| d9| 1|
|2020-01-19| d10| 0|
|2020-01-20| d11| 0|
+----------+------+---------+
期望的输出 df:
want_df = spark.createDataFrame([
("2020-01-12","d1",0,0),
("2020-01-12","d2",0,0),
("2020-01-13","d3",0,1),
("2020-01-14","d4",1,2),
("2020-01-15","d5",0,1),
("2020-01-15","d6",0,1),
("2020-01-16","d7",0,2),
("2020-01-17","d8",0,3),
("2020-01-18","d9",1,4),
("2020-01-19","d10",0,1),
("2020-01-20","d11",0,2),],
['date', 'device', 'condition', 'life'])
want_df.show()
+----------+------+---------+----+
| date|device|condition|life|
+----------+------+---------+----+
|2020-01-12| d1| 0| 0|
|2020-01-12| d2| 0| 0|
|2020-01-13| d3| 0| 1|
|2020-01-14| d4| 1| 2|
|2020-01-15| d5| 0| 1|
|2020-01-15| d6| 0| 1|
|2020-01-16| d7| 0| 2|
|2020-01-17| d8| 0| 3|
|2020-01-18| d9| 1| 4|
|2020-01-19| d10| 0| 1|
|2020-01-20| d11| 0| 2|
+----------+------+---------+----+
Objective 是计算到 condition=1
时的日期差异(天数),然后日期差异重置为从满足最后一个条件时开始的天数。 life
是试图计算的列。知道如何计算吗? Window
还是 lag
?
这是一类问题,可以通过添加一些临时行来简化(我们标记它们,然后稍后删除它们)
from pyspark.sql import Window
from pyspark.sql.functions import lit, lag, sum as fsum, first, datediff
(1) 首先,创建一个新的数据帧 df1,它复制条件 == 1 的所有行,但设置它们的条件 = 0 和标志 = 1,将生成的数据帧与原始数据帧合并(设置标志 = 0):
df1 = df.withColumn('flag', lit(0)).union(
df.where('condition = 1').withColumn('condition', lit(0)).withColumn('flag', lit(1))
)
(2) 然后,设置下面两个Window Specs,使用w1
帮助创建子组标签 g
将所有连续的行分组,直到条件切换从 1 到 0。将 flag
添加到 orderBy() 中,以便新添加的行位于条件 = 1 的相应行的后面,并分组到下一个组标签中。
w1 = Window.partitionBy(lit(0)).orderBy('date', 'flag')
w2 = Window.partitionBy(lit(0), 'g').orderBy('date', 'flag')
注意: 如果你有一个巨大的数据框,你可能想要将 lit(0)
更改为一些实际的或计算的列以避免 Spark 将所有行移动到一个单一的划分。 更新: 根据评论,数据帧是一个单一的时间序列,可以加载到一个分区上,因此使用 lit(0)
应该足够了。
(3) 在 w1
上使用 lag 和 sum 函数找到子组标签 'g' 然后计算同一组中的 first_date 使用 Window规格 w2
。此日期用于计算列 'life':
df2 = df1.withColumn('g', fsum((lag('condition').over(w1) == 1).astype('int')).over(w1)) \
.withColumn('first_date', first('date').over(w2)) \
.withColumn('life', datediff('date','first_date'))
df2.show()
+----------+------+---------+----+---+----------+----+
| date|device|condition|flag| g|first_date|life|
+----------+------+---------+----+---+----------+----+
|2020-01-12| d1| 0| 0| 0|2020-01-12| 0|
|2020-01-12| d2| 0| 0| 0|2020-01-12| 0|
|2020-01-13| d3| 0| 0| 0|2020-01-12| 1|
|2020-01-14| d4| 1| 0| 0|2020-01-12| 2|
|2020-01-14| d4| 0| 1| 1|2020-01-14| 0|
|2020-01-15| d5| 0| 0| 1|2020-01-14| 1|
|2020-01-15| d6| 0| 0| 1|2020-01-14| 1|
|2020-01-16| d7| 0| 0| 1|2020-01-14| 2|
|2020-01-17| d8| 0| 0| 1|2020-01-14| 3|
|2020-01-18| d9| 1| 0| 1|2020-01-14| 4|
|2020-01-18| d9| 0| 1| 2|2020-01-18| 0|
|2020-01-19| d10| 0| 0| 2|2020-01-18| 1|
|2020-01-20| d11| 0| 0| 2|2020-01-18| 2|
+----------+------+---------+----+---+----------+----+
(4) 删除临时行和列以获得最终数据帧:
df_new = df2.filter('flag = 0').drop('first_date', 'g', 'flag')
df_new.show()
+----------+------+---------+----+
| date|device|condition|life|
+----------+------+---------+----+
|2020-01-12| d1| 0| 0|
|2020-01-12| d2| 0| 0|
|2020-01-13| d3| 0| 1|
|2020-01-14| d4| 1| 2|
|2020-01-15| d5| 0| 1|
|2020-01-15| d6| 0| 1|
|2020-01-16| d7| 0| 2|
|2020-01-17| d8| 0| 3|
|2020-01-18| d9| 1| 4|
|2020-01-19| d10| 0| 1|
|2020-01-20| d11| 0| 2|
+----------+------+---------+----+
我尝试从不同的方式提供,更接近标准 sql 方言,但仍然使用 pyspark 语法并关注性能影响。
from pyspark.sql import Window
from pyspark.sql.functions import col, when, lit, lag, min, max, datediff
Select条件等于1的日期范围, 然后使用联合函数结合最大日期值。
w = Window.partitionBy('date')
dateRange = df.select(df.date).where(df.condition == 1)\
.union(df.select(max(df.date))).distinct()\
.orderBy('date')\
.withColumn('lastDate', lag(col('date').over(w))\
.select(when(col('lastDate').isNull(), lit('1970-01-01')).otherwise(col('lastDate')).alias('lastDate'), col('date').alias('toDate'))
Select 日期范围和第一个最小日期通过将 df 加入日期范围, 然后进行分组并计算最小日期值。
dateRange1st = df.join(dateRange, df.date > dateRange.lastDate & df.date <= dateRange.toDate, 'inner').groupBy(dateRange.lastDate, dateRange.toDate).agg(min(df.date).alias('frDate'))
Select 将日期范围 (1st) 加入 df 的结果, 寻求帮助日期过滤并找出不同之处。
result = df.join(dateRange1st, df.date.between(dateRange1st.frDate, dateRange1st.toDate), 'inner')\
.select(df.date, df.device, df.condition)\
.withColumn('life', datediff(df.date - dataRange1st.frDate))\
.orderBy(df.date)
result.show()
希望对您有所帮助!