在将数据框合并到特定大小后执行聚合

Performing aggregation after binning a dataframe to a specific size

我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)

+-----------------------+---------+
|TIME_STAMP             |RESULT   |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0     |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0     |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0     |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0     |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0     |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9  |
|2020-08-31 00:21:42.546|80.0     |
|2020-08-31 00:26:01.334|80.0     |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+

我已按 TIME_STAMP 对它进行排序,并希望将数据框以 5 个为一组进行装箱,并在每个组的 RESULT 列上执行聚合 (mean) .所以前 5 条记录将组成一个组,接下来的 5 条记录将导致 4 个组。

预期输出:

bin     mean
5   16802.7174
10  16798.8162
15  22374.829
20  16802.8264

此处,bin 列来自记录 1-5mean 列是这 5 条记录的平均值,依此类推。

在我的研究中,我似乎必须使用 monotonically_increasing_id() pyspark 函数,我试图避免使用它,因为我有非常大的数据集并且可能导致 OOM。

有没有一种方法可以实现这一点,而不必 collect 将整个数据集提供给驱动程序?

作为一个额外的问题,在上面的例子中,记录总数(20)可以被5整除。但是假设我有19条记录,有没有办法有3组5条记录和4条记录在最后一组?

  1. 首先使用 row_number() 为每一行分配一个行号 (按时间戳排序)。无需分区。
  2. 接下来,通过发言对行号进行分类((row_number - 1)/5).
  3. 最后变成平凡群

示例 SQL 您可以 运行 按原样轻松适应您的数据:

SELECT floor((id - 1)/5), avg(value)
FROM   (SELECT row_number() OVER (ORDER BY value) AS id,
               value
        FROM   (SELECT Explode(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210)) AS value) a)
GROUP  BY 1