在将数据框合并到特定大小后执行聚合
Performing aggregation after binning a dataframe to a specific size
我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)
+-----------------------+---------+
|TIME_STAMP |RESULT |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0 |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0 |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0 |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0 |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0 |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9 |
|2020-08-31 00:21:42.546|80.0 |
|2020-08-31 00:26:01.334|80.0 |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+
我已按 TIME_STAMP
对它进行排序,并希望将数据框以 5 个为一组进行装箱,并在每个组的 RESULT
列上执行聚合 (mean
) .所以前 5 条记录将组成一个组,接下来的 5 条记录将导致 4 个组。
预期输出:
bin mean
5 16802.7174
10 16798.8162
15 22374.829
20 16802.8264
此处,bin
列来自记录 1-5
,mean
列是这 5 条记录的平均值,依此类推。
在我的研究中,我似乎必须使用 monotonically_increasing_id()
pyspark 函数,我试图避免使用它,因为我有非常大的数据集并且可能导致 OOM。
有没有一种方法可以实现这一点,而不必 collect
将整个数据集提供给驱动程序?
作为一个额外的问题,在上面的例子中,记录总数(20)可以被5整除。但是假设我有19条记录,有没有办法有3组5条记录和4条记录在最后一组?
- 首先使用 row_number() 为每一行分配一个行号
(按时间戳排序)。无需分区。
- 接下来,通过发言对行号进行分类((row_number - 1)/5).
- 最后变成平凡群
示例 SQL 您可以 运行 按原样轻松适应您的数据:
SELECT floor((id - 1)/5), avg(value)
FROM (SELECT row_number() OVER (ORDER BY value) AS id,
value
FROM (SELECT Explode(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210)) AS value) a)
GROUP BY 1
我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)
+-----------------------+---------+
|TIME_STAMP |RESULT |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0 |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0 |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0 |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0 |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0 |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9 |
|2020-08-31 00:21:42.546|80.0 |
|2020-08-31 00:26:01.334|80.0 |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+
我已按 TIME_STAMP
对它进行排序,并希望将数据框以 5 个为一组进行装箱,并在每个组的 RESULT
列上执行聚合 (mean
) .所以前 5 条记录将组成一个组,接下来的 5 条记录将导致 4 个组。
预期输出:
bin mean
5 16802.7174
10 16798.8162
15 22374.829
20 16802.8264
此处,bin
列来自记录 1-5
,mean
列是这 5 条记录的平均值,依此类推。
在我的研究中,我似乎必须使用 monotonically_increasing_id()
pyspark 函数,我试图避免使用它,因为我有非常大的数据集并且可能导致 OOM。
有没有一种方法可以实现这一点,而不必 collect
将整个数据集提供给驱动程序?
作为一个额外的问题,在上面的例子中,记录总数(20)可以被5整除。但是假设我有19条记录,有没有办法有3组5条记录和4条记录在最后一组?
- 首先使用 row_number() 为每一行分配一个行号 (按时间戳排序)。无需分区。
- 接下来,通过发言对行号进行分类((row_number - 1)/5).
- 最后变成平凡群
示例 SQL 您可以 运行 按原样轻松适应您的数据:
SELECT floor((id - 1)/5), avg(value)
FROM (SELECT row_number() OVER (ORDER BY value) AS id,
value
FROM (SELECT Explode(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210)) AS value) a)
GROUP BY 1