如何使用滚动 window 函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?
How to count the number of adjacent values in a Pyspark Dataframe equal to a certain value using a rolling window function?
可以使用以下方法创建示例数据框:
from pyspark.sql.functions import col
from pyspark.sql.window import Window
df = sc.parallelize([['2019-08-29 01:00:00',0],
['2019-08-29 02:00:00',0],
['2019-08-29 03:00:00',0],
['2019-08-29 04:00:00',1],
['2019-08-29 05:00:00',2],
['2019-08-29 06:00:00',3],
['2019-08-29 07:00:00',0],
['2019-08-29 08:00:00',2],
['2019-08-29 09:00:00',0],
['2019-08-29 10:00:00',1]]).toDF(['DATETIME','VAL']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))
我想生成一个列,其计数等于 3 小时内(当前时间 +/- 1 小时,包括当前 Val)内 0 值的出现次数。 window 可以使用以下方法创建:
w1 = (Window()
.orderBy(col('DATETIME').cast('long'))
.rangeBetween(-(60*60), 60*60))
期望的结果:
+-------------------+---+---+
| DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00| 0| 2|
|2019-08-29 02:00:00| 0| 3|
|2019-08-29 03:00:00| 0| 2|
|2019-08-29 04:00:00| 1| 1|
|2019-08-29 05:00:00| 2| 0|
|2019-08-29 06:00:00| 3| 1|
|2019-08-29 07:00:00| 0| 1|
|2019-08-29 08:00:00| 2| 2|
|2019-08-29 09:00:00| 0| 1|
|2019-08-29 10:00:00| 1| 1|
+-------------------+---+---+
如果每个 DATETIME 只有 1 个条目,您可以使用 lead 和 lag函数获取上一个和下一个值,然后你可以计算零。
from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import IntegerType
count_zeros_udf = udf(lambda arr: arr.count(0), IntegerType())
df.withColumn('lag1', f.lag(col('VAL'), 1, -1).over(Window.orderBy("DATETIME"))) # Get the previous value
.withColumn('lag2', f.lead(col('VAL'), 1, -1).over(Window.orderBy("DATETIME"))) # Get the next value
.withColumn('NUM', count_zeros_udf(array('VAL', 'lag1', 'lag2'))) # Count zeros using the udf
.drop('lag1', 'lag2') # Drop the extra columns
.show()
+-------------------+---+---+
| DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00| 0| 2|
|2019-08-29 02:00:00| 0| 3|
|2019-08-29 03:00:00| 0| 2|
|2019-08-29 04:00:00| 1| 1|
|2019-08-29 05:00:00| 2| 0|
|2019-08-29 06:00:00| 3| 1|
|2019-08-29 07:00:00| 0| 1|
|2019-08-29 08:00:00| 2| 2|
|2019-08-29 09:00:00| 0| 1|
|2019-08-29 10:00:00| 1| 1|
+-------------------+---+---+
使用 pyspark >= 2.4,你可以使用 UDF 和 pandas UDF 在 window 上解释为这里
。不幸的是,我没有 pyspark 2.4 或更高版本,因此我无法测试它。
可以使用以下方法创建示例数据框:
from pyspark.sql.functions import col
from pyspark.sql.window import Window
df = sc.parallelize([['2019-08-29 01:00:00',0],
['2019-08-29 02:00:00',0],
['2019-08-29 03:00:00',0],
['2019-08-29 04:00:00',1],
['2019-08-29 05:00:00',2],
['2019-08-29 06:00:00',3],
['2019-08-29 07:00:00',0],
['2019-08-29 08:00:00',2],
['2019-08-29 09:00:00',0],
['2019-08-29 10:00:00',1]]).toDF(['DATETIME','VAL']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))
我想生成一个列,其计数等于 3 小时内(当前时间 +/- 1 小时,包括当前 Val)内 0 值的出现次数。 window 可以使用以下方法创建:
w1 = (Window()
.orderBy(col('DATETIME').cast('long'))
.rangeBetween(-(60*60), 60*60))
期望的结果:
+-------------------+---+---+
| DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00| 0| 2|
|2019-08-29 02:00:00| 0| 3|
|2019-08-29 03:00:00| 0| 2|
|2019-08-29 04:00:00| 1| 1|
|2019-08-29 05:00:00| 2| 0|
|2019-08-29 06:00:00| 3| 1|
|2019-08-29 07:00:00| 0| 1|
|2019-08-29 08:00:00| 2| 2|
|2019-08-29 09:00:00| 0| 1|
|2019-08-29 10:00:00| 1| 1|
+-------------------+---+---+
如果每个 DATETIME 只有 1 个条目,您可以使用 lead 和 lag函数获取上一个和下一个值,然后你可以计算零。
from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import IntegerType
count_zeros_udf = udf(lambda arr: arr.count(0), IntegerType())
df.withColumn('lag1', f.lag(col('VAL'), 1, -1).over(Window.orderBy("DATETIME"))) # Get the previous value
.withColumn('lag2', f.lead(col('VAL'), 1, -1).over(Window.orderBy("DATETIME"))) # Get the next value
.withColumn('NUM', count_zeros_udf(array('VAL', 'lag1', 'lag2'))) # Count zeros using the udf
.drop('lag1', 'lag2') # Drop the extra columns
.show()
+-------------------+---+---+
| DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00| 0| 2|
|2019-08-29 02:00:00| 0| 3|
|2019-08-29 03:00:00| 0| 2|
|2019-08-29 04:00:00| 1| 1|
|2019-08-29 05:00:00| 2| 0|
|2019-08-29 06:00:00| 3| 1|
|2019-08-29 07:00:00| 0| 1|
|2019-08-29 08:00:00| 2| 2|
|2019-08-29 09:00:00| 0| 1|
|2019-08-29 10:00:00| 1| 1|
+-------------------+---+---+
使用 pyspark >= 2.4,你可以使用 UDF 和 pandas UDF 在 window 上解释为这里