在 PySpark 数据集中加入每个月的上个月数据
Joining on the previous month data for each month in the PySpark dataset
我有一个按月计算的数据集,每个月都有 N 个帐户。有些月份会有新账户,有些账户会在特定月份后消失(这是随机完成的)。
我需要获取一个账户的当月余额并将其从上个月的余额中扣除(如果该账户在上个月存在),否则将其作为当月余额。
有人建议我每个月加入一次。即加入 month1 到 month2,month2 到 month3,等等。但我不确定那会怎样......
这是一个示例数据集:
|date |account |balance |
----------------------------------
|01.01.2019|1 |40 |
|01.01.2019|2 |33 |
|01.01.2019|3 |31 |
|01.02.2019|1 |32 |
|01.02.2019|2 |56 |
|01.02.2019|4 |89 |
|01.03.2019|2 |12 |
|01.03.2019|4 |35 |
|01.03.2019|5 |76 |
|01.03.2019|6 |47 |
----------------------------------
每个已离开、当前和新加入的帐户的帐户 ID 都是唯一的。
我一开始用的是f.lag,但是现在有账户消失又新来,每个月的账户数不是固定的,所以我不能落后。正如我所说,有人建议我使用 join。 IE。将 1 月加入 2 月,将 2 月加入 3 月,依此类推
但我真的不确定那会怎样。有人有什么想法吗?
P.S。我创建了这个table,其中包含一个保留帐户的示例、一个新帐户和一个从以后几个月中删除的帐户。
最终目标是:
|date |account |balance | balance_diff_with_previous_month |
--------------------------------------------------------------------|
|01.01.2019|1 |40 |na |
|01.01.2019|2 |33 |na |
|01.01.2019|3 |31 |na |
|01.02.2019|1 |32 |-8 |
|01.02.2019|2 |56 |23 |
|01.02.2019|4 |89 |89 |
|01.03.2019|2 |12 |-44 |
|01.03.2019|4 |35 |-54 |
|01.03.2019|5 |76 |76 |
|01.03.2019|6 |47 |47 |
--------------------------------------------------------------------|
正如我所说,f.lag 不能使用,因为每个月的帐户数不是恒定的,我不控制帐户数,因此不能 f.lag 恒定的行数。
有人知道如何加入帐户 and/or 日期(当前月份)和日期 1(上个月)吗?
感谢阅读和帮助:)
如果按 account
和 进行分区,F.lag
可以完美满足您的需求
partition = Window.partitionBy("account") \
.orderBy(F.col("date").cast("timestamp").cast("long"))
previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
.show(10, False)
使用联接的替代解决方案....
df = spark.createDataFrame([
("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
["date","account","balance"])
df.alias("current").join(
df.alias("previous"),
[F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
"left"
).select(
F.col("current.date").alias("date"),
F.coalesce("current.account", "previous.account").alias("account"),
F.col("current.balance").alias("balance"),
(F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()
结果
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019| 1| 40| 40|
|01.01.2019| 2| 33| 33|
|01.01.2019| 3| 31| 31|
|01.02.2019| 1| 32| -8|
|01.02.2019| 2| 56| 23|
|01.02.2019| 4| 89| 89|
|01.03.2019| 2| 12| -44|
|01.03.2019| 4| 35| -54|
|01.03.2019| 5| 76| 76|
|01.03.2019| 6| 47| 47|
+----------+-------+-------+--------------------------------+
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
| date|account|balance|
+----------+-------+-------+
|01.01.2019| 1| 40|
|01.01.2019| 2| 33|
|01.01.2019| 3| 31|
|01.02.2019| 1| 32|
|01.02.2019| 2| 56|
|01.02.2019| 4| 89|
|01.03.2019| 2| 12|
|01.03.2019| 4| 35|
|01.03.2019| 5| 76|
|01.03.2019| 6| 47|
+----------+-------+-------+
>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01| 1| 40| 40.0|
|2019-01-01| 2| 33| 33.0|
|2019-01-01| 3| 31| 31.0|
|2019-02-01| 1| 32| -8.0|
|2019-02-01| 2| 56| 23.0|
|2019-02-01| 4| 89| 89.0|
|2019-03-01| 2| 12| -44.0|
|2019-03-01| 4| 35| -54.0|
|2019-03-01| 5| 76| 76.0|
|2019-03-01| 6| 47| 47.0|
+----------+-------+-------+--------------------------------+
我有一个按月计算的数据集,每个月都有 N 个帐户。有些月份会有新账户,有些账户会在特定月份后消失(这是随机完成的)。
我需要获取一个账户的当月余额并将其从上个月的余额中扣除(如果该账户在上个月存在),否则将其作为当月余额。
有人建议我每个月加入一次。即加入 month1 到 month2,month2 到 month3,等等。但我不确定那会怎样......
这是一个示例数据集:
|date |account |balance |
----------------------------------
|01.01.2019|1 |40 |
|01.01.2019|2 |33 |
|01.01.2019|3 |31 |
|01.02.2019|1 |32 |
|01.02.2019|2 |56 |
|01.02.2019|4 |89 |
|01.03.2019|2 |12 |
|01.03.2019|4 |35 |
|01.03.2019|5 |76 |
|01.03.2019|6 |47 |
----------------------------------
每个已离开、当前和新加入的帐户的帐户 ID 都是唯一的。
我一开始用的是f.lag,但是现在有账户消失又新来,每个月的账户数不是固定的,所以我不能落后。正如我所说,有人建议我使用 join。 IE。将 1 月加入 2 月,将 2 月加入 3 月,依此类推
但我真的不确定那会怎样。有人有什么想法吗?
P.S。我创建了这个table,其中包含一个保留帐户的示例、一个新帐户和一个从以后几个月中删除的帐户。
最终目标是:
|date |account |balance | balance_diff_with_previous_month |
--------------------------------------------------------------------|
|01.01.2019|1 |40 |na |
|01.01.2019|2 |33 |na |
|01.01.2019|3 |31 |na |
|01.02.2019|1 |32 |-8 |
|01.02.2019|2 |56 |23 |
|01.02.2019|4 |89 |89 |
|01.03.2019|2 |12 |-44 |
|01.03.2019|4 |35 |-54 |
|01.03.2019|5 |76 |76 |
|01.03.2019|6 |47 |47 |
--------------------------------------------------------------------|
正如我所说,f.lag 不能使用,因为每个月的帐户数不是恒定的,我不控制帐户数,因此不能 f.lag 恒定的行数。
有人知道如何加入帐户 and/or 日期(当前月份)和日期 1(上个月)吗?
感谢阅读和帮助:)
account
和 进行分区,F.lag
可以完美满足您的需求
partition = Window.partitionBy("account") \
.orderBy(F.col("date").cast("timestamp").cast("long"))
previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
.show(10, False)
使用联接的替代解决方案....
df = spark.createDataFrame([
("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
["date","account","balance"])
df.alias("current").join(
df.alias("previous"),
[F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
"left"
).select(
F.col("current.date").alias("date"),
F.coalesce("current.account", "previous.account").alias("account"),
F.col("current.balance").alias("balance"),
(F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()
结果
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019| 1| 40| 40|
|01.01.2019| 2| 33| 33|
|01.01.2019| 3| 31| 31|
|01.02.2019| 1| 32| -8|
|01.02.2019| 2| 56| 23|
|01.02.2019| 4| 89| 89|
|01.03.2019| 2| 12| -44|
|01.03.2019| 4| 35| -54|
|01.03.2019| 5| 76| 76|
|01.03.2019| 6| 47| 47|
+----------+-------+-------+--------------------------------+
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
| date|account|balance|
+----------+-------+-------+
|01.01.2019| 1| 40|
|01.01.2019| 2| 33|
|01.01.2019| 3| 31|
|01.02.2019| 1| 32|
|01.02.2019| 2| 56|
|01.02.2019| 4| 89|
|01.03.2019| 2| 12|
|01.03.2019| 4| 35|
|01.03.2019| 5| 76|
|01.03.2019| 6| 47|
+----------+-------+-------+
>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01| 1| 40| 40.0|
|2019-01-01| 2| 33| 33.0|
|2019-01-01| 3| 31| 31.0|
|2019-02-01| 1| 32| -8.0|
|2019-02-01| 2| 56| 23.0|
|2019-02-01| 4| 89| 89.0|
|2019-03-01| 2| 12| -44.0|
|2019-03-01| 4| 35| -54.0|
|2019-03-01| 5| 76| 76.0|
|2019-03-01| 6| 47| 47.0|
+----------+-------+-------+--------------------------------+