使用 Pyspark/Databricks 在大型数据集中基于日期和 ID 条件进行迭代和计算的有效方法
Effective way to iterate and calculate based on dates and ID conditions in large dataset using Pyspark/Databricks
我没有使用 Pyspark 的经验,所以如果有人可以帮助我解决以下问题:
我有以下 Spark 数据框:
| ID_user | date_contract | total_debt |
| 2564541 | 2015-08-22 |64 |
| 2564541 | 2019-05-16 |100 |
| 2564541 | 2020-08-15 |200 |
| 2564541 | 2021-09-17 |150 |
| 3000000 | 2014-08-22 |84 |
| 3000000 | 2015-08-23 |100 |
| 3000000 | 2016-08-24 |200 |
正如您在上面看到的,有些用户拥有不止一份合同,他们每天只能签订一份合同。因此,目标是为每个用户(ID_user
)计算当前合约之前达成的所有合约的平均总债务。
所以用户2564541
在2021-09-17
签订了一份合同,因此根据该合同之前(2020/2019/2015年)为他商定的所有合同计算的平均债务为mean(200,100,64) = 121,33
.
我认为需要某种迭代,因为我必须对每个 id_user
和每个合同日期都做同样的事情。
预期输出:
| ID_user | date_contract | total_debt | avg_debt_before |
| 2564541 | 2015-08-22 |64 | 0 |
| 2564541 | 2019-05-16 |100 | 64 |
| 2564541 | 2020-08-15 |200 | 82 |
| 2564541 | 2021-09-17 |150 | 121,3 |
| 3000000 | 2014-08-22 |84 | 0 |
| 3000000 | 2015-08-23 |30 | 84 |
| 3000000 | 2016-08-24 |50 | 57 |
我已经尝试使用 rdd.toLocalIterator()
,但没有成功。我也花了将近整个星期的时间在这里寻找答案,所以任何帮助或提示都会很棒!
您正在寻找滚动平均值。您可以使用 avg
函数在按 ID_user
分区并按 date_contract
:
排序的 Window 上计算它
from pyspark.sql import functions as F, Window
w = (Window.partitionBy("ID_user")
.orderBy("date_contract")
.rowsBetween(Window.unboundedPreceding, -1)
)
df1 = df.withColumn(
"avg_debt_before",
F.round(F.coalesce(F.avg("total_debt").over(w), F.lit(0)), 2)
)
df1.show()
#+-------+-------------+----------+---------------+
#|ID_user|date_contract|total_debt|avg_debt_before|
#+-------+-------------+----------+---------------+
#|2564541| 2015-08-22| 64| 0.0|
#|2564541| 2019-05-16| 100| 64.0|
#|2564541| 2020-08-15| 200| 82.0|
#|2564541| 2021-09-17| 150| 121.33|
#|3000000| 2014-08-22| 84| 0.0|
#|3000000| 2015-08-23| 100| 84.0|
#|3000000| 2016-08-24| 200| 92.0|
#+-------+-------------+----------+---------------+
我没有使用 Pyspark 的经验,所以如果有人可以帮助我解决以下问题:
我有以下 Spark 数据框:
| ID_user | date_contract | total_debt |
| 2564541 | 2015-08-22 |64 |
| 2564541 | 2019-05-16 |100 |
| 2564541 | 2020-08-15 |200 |
| 2564541 | 2021-09-17 |150 |
| 3000000 | 2014-08-22 |84 |
| 3000000 | 2015-08-23 |100 |
| 3000000 | 2016-08-24 |200 |
正如您在上面看到的,有些用户拥有不止一份合同,他们每天只能签订一份合同。因此,目标是为每个用户(ID_user
)计算当前合约之前达成的所有合约的平均总债务。
所以用户2564541
在2021-09-17
签订了一份合同,因此根据该合同之前(2020/2019/2015年)为他商定的所有合同计算的平均债务为mean(200,100,64) = 121,33
.
我认为需要某种迭代,因为我必须对每个 id_user
和每个合同日期都做同样的事情。
预期输出:
| ID_user | date_contract | total_debt | avg_debt_before |
| 2564541 | 2015-08-22 |64 | 0 |
| 2564541 | 2019-05-16 |100 | 64 |
| 2564541 | 2020-08-15 |200 | 82 |
| 2564541 | 2021-09-17 |150 | 121,3 |
| 3000000 | 2014-08-22 |84 | 0 |
| 3000000 | 2015-08-23 |30 | 84 |
| 3000000 | 2016-08-24 |50 | 57 |
我已经尝试使用 rdd.toLocalIterator()
,但没有成功。我也花了将近整个星期的时间在这里寻找答案,所以任何帮助或提示都会很棒!
您正在寻找滚动平均值。您可以使用 avg
函数在按 ID_user
分区并按 date_contract
:
from pyspark.sql import functions as F, Window
w = (Window.partitionBy("ID_user")
.orderBy("date_contract")
.rowsBetween(Window.unboundedPreceding, -1)
)
df1 = df.withColumn(
"avg_debt_before",
F.round(F.coalesce(F.avg("total_debt").over(w), F.lit(0)), 2)
)
df1.show()
#+-------+-------------+----------+---------------+
#|ID_user|date_contract|total_debt|avg_debt_before|
#+-------+-------------+----------+---------------+
#|2564541| 2015-08-22| 64| 0.0|
#|2564541| 2019-05-16| 100| 64.0|
#|2564541| 2020-08-15| 200| 82.0|
#|2564541| 2021-09-17| 150| 121.33|
#|3000000| 2014-08-22| 84| 0.0|
#|3000000| 2015-08-23| 100| 84.0|
#|3000000| 2016-08-24| 200| 92.0|
#+-------+-------------+----------+---------------+