PySpark:如何计算特定日期的平均值?
PySpark: how to calculate the average up to a certain date?
我正在使用带有 Pyspark >= 3.1 的 EMR 笔记本
我有 4 列:
- ID_CLIENT: 客户端唯一索引
- IDX_TRX:交易的唯一索引。实际上这是一个字母数字列,索引不表示任何顺序。本例中的数字索引是为了方便说明。
- dt: datetime, 交易日期
- AVERAGE_TRX:交易金额
我要计算以下列'AVERAGE_TRX':
+---------+--------+----------+-----+------------+
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
+---------+--------+----------+-----+------------+
| A| 01|2018-06-14| 10| NULL| # 1st trx there are no records
| B| 01|2018-06-14| 5| NULL| # 1st trx there are no records
| A| 02|2018-06-15| 20| 10| # 10 / 1
| A| 03|2018-06-15| 30| 15| # (10 + 20) / 2
| B| 02|2018-06-16| 10| 5| # 5 / 1
| A| 04|2018-06-16| 20| 20| # (10 + 20 + 30) / 3
| A| 05|2018-06-17| 5| 20| # (10 + 20 + 30 + 20) / 4
| B| 03|2018-06-17| 10| 7.5| # (5 + 10) / 2
| A| 06|2018-06-18| 15| 17| # (10 + 20 + 30 + 20 + 5) / 5
| B| 04|2018-06-18| 10| 8.3333334| # (5 + 10 + 10) / 3
+---------+--------+----------+-----+------------+
如何计算最后一列?
提前致谢。
试试这个 -
这会弄乱记录的顺序,但它应该具有所需的行值。
from pyspark.sql import Window
w = (Window.partitionBy('ID_CLIENT')
.orderBy('IDX_TRX')
.rowsBetween(Window.unboundedPreceding, -1)) # -1 is to do cumulative calculation up until the previous row.
df = df.withColumn('AVERAGE_TRX', avg('AMOUNT').over(w))
df.show()
+---------+--------+----------+-----+------------+
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
+---------+--------+----------+-----+------------+
| A| 01|2018-06-14| 10| null|
| A| 02|2018-06-15| 20| 10.0|
| A| 03|2018-06-15| 30| 15.0|
| A| 04|2018-06-16| 20| 20.0|
| A| 05|2018-06-17| 5| 20.0|
| A| 06|2018-06-18| 15| 17.0|
| B| 01|2018-06-14| 5| null|
| B| 02|2018-06-16| 10| 5.0|
| B| 03|2018-06-17| 10| 7.5|
| B| 04|2018-06-18| 10| 8.3333334|
+---------+--------+----------+-----+------------+
我正在使用带有 Pyspark >= 3.1 的 EMR 笔记本
我有 4 列:
- ID_CLIENT: 客户端唯一索引
- IDX_TRX:交易的唯一索引。实际上这是一个字母数字列,索引不表示任何顺序。本例中的数字索引是为了方便说明。
- dt: datetime, 交易日期
- AVERAGE_TRX:交易金额
我要计算以下列'AVERAGE_TRX':
+---------+--------+----------+-----+------------+
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
+---------+--------+----------+-----+------------+
| A| 01|2018-06-14| 10| NULL| # 1st trx there are no records
| B| 01|2018-06-14| 5| NULL| # 1st trx there are no records
| A| 02|2018-06-15| 20| 10| # 10 / 1
| A| 03|2018-06-15| 30| 15| # (10 + 20) / 2
| B| 02|2018-06-16| 10| 5| # 5 / 1
| A| 04|2018-06-16| 20| 20| # (10 + 20 + 30) / 3
| A| 05|2018-06-17| 5| 20| # (10 + 20 + 30 + 20) / 4
| B| 03|2018-06-17| 10| 7.5| # (5 + 10) / 2
| A| 06|2018-06-18| 15| 17| # (10 + 20 + 30 + 20 + 5) / 5
| B| 04|2018-06-18| 10| 8.3333334| # (5 + 10 + 10) / 3
+---------+--------+----------+-----+------------+
如何计算最后一列?
提前致谢。
试试这个 - 这会弄乱记录的顺序,但它应该具有所需的行值。
from pyspark.sql import Window
w = (Window.partitionBy('ID_CLIENT')
.orderBy('IDX_TRX')
.rowsBetween(Window.unboundedPreceding, -1)) # -1 is to do cumulative calculation up until the previous row.
df = df.withColumn('AVERAGE_TRX', avg('AMOUNT').over(w))
df.show()
+---------+--------+----------+-----+------------+
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
+---------+--------+----------+-----+------------+
| A| 01|2018-06-14| 10| null|
| A| 02|2018-06-15| 20| 10.0|
| A| 03|2018-06-15| 30| 15.0|
| A| 04|2018-06-16| 20| 20.0|
| A| 05|2018-06-17| 5| 20.0|
| A| 06|2018-06-18| 15| 17.0|
| B| 01|2018-06-14| 5| null|
| B| 02|2018-06-16| 10| 5.0|
| B| 03|2018-06-17| 10| 7.5|
| B| 04|2018-06-18| 10| 8.3333334|
+---------+--------+----------+-----+------------+