使用 Pyspark/Databricks 在大型数据集中基于日期和 ID 条件进行迭代和计算的有效方法

Effective way to iterate and calculate based on dates and ID conditions in large dataset using Pyspark/Databricks

我没有使用 Pyspark 的经验,所以如果有人可以帮助我解决以下问题:

我有以下 Spark 数据框:

| ID_user | date_contract | total_debt | 
| 2564541 | 2015-08-22    |64          | 
| 2564541 | 2019-05-16    |100         |
| 2564541 | 2020-08-15    |200         |
| 2564541 | 2021-09-17    |150         |
| 3000000 | 2014-08-22    |84          |
| 3000000 | 2015-08-23    |100         |
| 3000000 | 2016-08-24    |200         |

正如您在上面看到的,有些用户拥有不止一份合同,他们每天只能签订一份合同。因此,目标是为每个用户(ID_user)计算当前合约之前达成的所有合约的平均总债务。

所以用户25645412021-09-17签订了一份合同,因此根据该合同之前(2020/2019/2015年)为他商定的所有合同计算的平均债务为mean(200,100,64) = 121,33.

我认为需要某种迭代,因为我必须对每个 id_user 和每个合同日期都做同样的事情。

预期输出:

| ID_user | date_contract | total_debt | avg_debt_before |
| 2564541 | 2015-08-22    |64          | 0               |
| 2564541 | 2019-05-16    |100         | 64              |
| 2564541 | 2020-08-15    |200         | 82              |
| 2564541 | 2021-09-17    |150         | 121,3           |
| 3000000 | 2014-08-22    |84          | 0               |
| 3000000 | 2015-08-23    |30          | 84              |
| 3000000 | 2016-08-24    |50          | 57              |

我已经尝试使用 rdd.toLocalIterator(),但没有成功。我也花了将近整个星期的时间在这里寻找答案,所以任何帮助或提示都会很棒!

您正在寻找滚动平均值。您可以使用 avg 函数在按 ID_user 分区并按 date_contract:

排序的 Window 上计算它
from pyspark.sql import functions as F, Window

w = (Window.partitionBy("ID_user")
     .orderBy("date_contract")
     .rowsBetween(Window.unboundedPreceding, -1)
     )

df1 = df.withColumn(
    "avg_debt_before",
    F.round(F.coalesce(F.avg("total_debt").over(w), F.lit(0)), 2)
)

df1.show()
#+-------+-------------+----------+---------------+
#|ID_user|date_contract|total_debt|avg_debt_before|
#+-------+-------------+----------+---------------+
#|2564541|   2015-08-22|        64|            0.0|
#|2564541|   2019-05-16|       100|           64.0|
#|2564541|   2020-08-15|       200|           82.0|
#|2564541|   2021-09-17|       150|         121.33|
#|3000000|   2014-08-22|        84|            0.0|
#|3000000|   2015-08-23|       100|           84.0|
#|3000000|   2016-08-24|       200|           92.0|
#+-------+-------------+----------+---------------+