在 pyspark 数据框中每 60 行应用一个函数
Apply a function every 60 rows in a pyspark dataframe
我的数据框叫做 df,有 123729 行,看起来像这样:
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 8|
我需要每 60 行或秒聚合一次到多个值。对于每一分钟,我都想知道最小心率、平均心率、最大心率,以及 maxABP 在这些秒中是否低于 85。所需的输出类似于下面的 table,其中如果 maxABP < 85,则警报为 1,否则为 0。
Min_HR
Max_HR
Avg_HR
Alarm
Minute
70
100
80
1
1
60
90
75
0
2
我想知道是否可以使用 mapreduce 将每 60 行聚合为这些单个值。我知道有很多错误,但也许是这样的:
def max_HR(df, i):
x = i
y = i+60
return reduce(lambda x, y: max(df[x:y]))
df_maxHR = map(lambda i: max_HR(i))
其中 i
应该是 df 的一部分。
示例 DF:
df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 1001|
|114| 126.0| 1003|
|115| 83.0| 1064|
|116| 127.0| 1066|
然后使用window函数:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df\
.withColumn('Minute', F.round(F.col('Second')/60,0)+1)\
.withColumn('Min_HR', F.min('HR').over(w1))\
.withColumn('Max_HR', F.max('HR').over(w1))\
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
.dropDuplicates()\
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
.orderBy('Minute')\
.show()
+------+------+------+-----+------+
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
+------+------+------+-----+------+
| 109| 111| 110.0| 0| 1.0|
| 111| 114| 113.0| 0| 18.0|
| 115| 116| 116.0| 1| 19.0|
我认为 groupBy
足以获得所需的结果。
df.show()
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 10|
|110| 127.0| 20|
|111| 127.0| 30|
|111| 127.0| 40|
|111| 126.0| 50|
|111| 127.0| 60|
|109| 126.0| 70|
|111| 126.0| 80|
+---+------+------+
df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
.groupBy('Minute').agg( \
f.round(f.min('HR'), 2).alias('Min_HR'), \
f.round(f.max('HR'), 2).alias('Max_HR'), \
f.round(f.avg('HR'), 2).alias('Avg_HR'), \
f.max('maxABP').alias('maxABP')) \
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
.show()
+------+------+------+------+------+-----+
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
+------+------+------+------+------+-----+
| 1| 109| 111|110.33| 127.0| 0|
| 0| 110| 111| 110.6| 128.0| 0|
+------+------+------+------+------+-----+
我的数据框叫做 df,有 123729 行,看起来像这样:
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 8|
我需要每 60 行或秒聚合一次到多个值。对于每一分钟,我都想知道最小心率、平均心率、最大心率,以及 maxABP 在这些秒中是否低于 85。所需的输出类似于下面的 table,其中如果 maxABP < 85,则警报为 1,否则为 0。
Min_HR | Max_HR | Avg_HR | Alarm | Minute |
---|---|---|---|---|
70 | 100 | 80 | 1 | 1 |
60 | 90 | 75 | 0 | 2 |
我想知道是否可以使用 mapreduce 将每 60 行聚合为这些单个值。我知道有很多错误,但也许是这样的:
def max_HR(df, i):
x = i
y = i+60
return reduce(lambda x, y: max(df[x:y]))
df_maxHR = map(lambda i: max_HR(i))
其中 i
应该是 df 的一部分。
示例 DF:
df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 1001|
|114| 126.0| 1003|
|115| 83.0| 1064|
|116| 127.0| 1066|
然后使用window函数:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df\
.withColumn('Minute', F.round(F.col('Second')/60,0)+1)\
.withColumn('Min_HR', F.min('HR').over(w1))\
.withColumn('Max_HR', F.max('HR').over(w1))\
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
.dropDuplicates()\
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
.orderBy('Minute')\
.show()
+------+------+------+-----+------+
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
+------+------+------+-----+------+
| 109| 111| 110.0| 0| 1.0|
| 111| 114| 113.0| 0| 18.0|
| 115| 116| 116.0| 1| 19.0|
我认为 groupBy
足以获得所需的结果。
df.show()
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 10|
|110| 127.0| 20|
|111| 127.0| 30|
|111| 127.0| 40|
|111| 126.0| 50|
|111| 127.0| 60|
|109| 126.0| 70|
|111| 126.0| 80|
+---+------+------+
df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
.groupBy('Minute').agg( \
f.round(f.min('HR'), 2).alias('Min_HR'), \
f.round(f.max('HR'), 2).alias('Max_HR'), \
f.round(f.avg('HR'), 2).alias('Avg_HR'), \
f.max('maxABP').alias('maxABP')) \
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
.show()
+------+------+------+------+------+-----+
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
+------+------+------+------+------+-----+
| 1| 109| 111|110.33| 127.0| 0|
| 0| 110| 111| 110.6| 128.0| 0|
+------+------+------+------+------+-----+