在 pyspark 数据框中每 60 行应用一个函数

Apply a function every 60 rows in a pyspark dataframe

我的数据框叫做 df,有 123729 行,看起来像这样:

+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|     8|

我需要每 60 行或秒聚合一次到多个值。对于每一分钟,我都想知道最小心率、平均心率、最大心率,以及 maxABP 在这些秒中是否低于 85。所需的输出类似于下面的 table,其中如果 maxABP < 85,则警报为 1,否则为 0。

Min_HR Max_HR Avg_HR Alarm Minute
70 100 80 1 1
60 90 75 0 2

我想知道是否可以使用 mapreduce 将每 60 行聚合为这些单个值。我知道有很多错误,但也许是这样的:

def max_HR(df, i):
   x = i
   y = i+60
   return reduce(lambda x, y: max(df[x:y]))

df_maxHR = map(lambda i: max_HR(i))

其中 i 应该是 df 的一部分。

示例 DF:

df = spark.createDataFrame(
  [
     (110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
    ,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
    ,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
  ], ['HR', 'maxABP', 'Second']
)

+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|  1001|
|114| 126.0|  1003|
|115|  83.0|  1064|
|116| 127.0|  1066|

然后使用window函数:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w1 = (Window.partitionBy(F.col('Minute')))

df\
  .withColumn('Minute', F.round(F.col('Second')/60,0)+1)\
  .withColumn('Min_HR', F.min('HR').over(w1))\
  .withColumn('Max_HR', F.max('HR').over(w1))\
  .withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
  .withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
  .select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
  .dropDuplicates()\
  .withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
  .select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
  .orderBy('Minute')\
  .show()

+------+------+------+-----+------+
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
+------+------+------+-----+------+
|   109|   111| 110.0|    0|   1.0|
|   111|   114| 113.0|    0|  18.0|
|   115|   116| 116.0|    1|  19.0|

我认为 groupBy 足以获得所需的结果。

df.show()
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|    10|
|110| 127.0|    20|
|111| 127.0|    30|
|111| 127.0|    40|
|111| 126.0|    50|
|111| 127.0|    60|
|109| 126.0|    70|
|111| 126.0|    80|
+---+------+------+

df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
  .groupBy('Minute').agg( \
    f.round(f.min('HR'), 2).alias('Min_HR'), \
    f.round(f.max('HR'), 2).alias('Max_HR'), \
    f.round(f.avg('HR'), 2).alias('Avg_HR'), \
    f.max('maxABP').alias('maxABP')) \
  .withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
  .show()

+------+------+------+------+------+-----+
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
+------+------+------+------+------+-----+
|     1|   109|   111|110.33| 127.0|    0|
|     0|   110|   111| 110.6| 128.0|    0|
+------+------+------+------+------+-----+