在 PySpark 数据集中加入每个月的上个月数据

Joining on the previous month data for each month in the PySpark dataset

我有一个按月计算的数据集,每个月都有 N 个帐户。有些月份会有新账户,有些账户会在特定月份后消失(这是随机完成的)。

我需要获取一个账户的当月余额并将其从上个月的余额中扣除(如果该账户在上个月存在),否则将其作为当月余额。

有人建议我每个月加入一次。即加入 month1 到 month2,month2 到 month3,等等。但我不确定那会怎样......

这是一个示例数据集:

|date      |account   |balance   |
----------------------------------
|01.01.2019|1         |40        |
|01.01.2019|2         |33        |
|01.01.2019|3         |31        |
|01.02.2019|1         |32        |
|01.02.2019|2         |56        |
|01.02.2019|4         |89        |
|01.03.2019|2         |12        |
|01.03.2019|4         |35        |
|01.03.2019|5         |76        |
|01.03.2019|6         |47        |
----------------------------------

每个已离开、当前和新加入的帐户的帐户 ID 都是唯一的。

我一开始用的是f.lag,但是现在有账户消失又新来,每个月的账户数不是固定的,所以我不能落后。正如我所说,有人建议我使用 join。 IE。将 1 月加入 2 月,将 2 月加入 3 月,依此类推

但我真的不确定那会怎样。有人有什么想法吗?

P.S。我创建了这个table,其中包含一个保留帐户的示例、一个新帐户和一个从以后几个月中删除的帐户。

最终目标是:

|date      |account   |balance   | balance_diff_with_previous_month  |
--------------------------------------------------------------------|
|01.01.2019|1         |40        |na                                |
|01.01.2019|2         |33        |na                                |
|01.01.2019|3         |31        |na                                |
|01.02.2019|1         |32        |-8                                |
|01.02.2019|2         |56        |23                                |
|01.02.2019|4         |89        |89                                |
|01.03.2019|2         |12        |-44                               |
|01.03.2019|4         |35        |-54                               |
|01.03.2019|5         |76        |76                                |
|01.03.2019|6         |47        |47                                |
--------------------------------------------------------------------|

正如我所说,f.lag 不能使用,因为每个月的帐户数不是恒定的,我不控制帐户数,因此不能 f.lag 恒定的行数。

有人知道如何加入帐户 and/or 日期(当前月份)和日期 1(上个月)吗?

感谢阅读和帮助:)

如果按 account

进行分区,

F.lag 可以完美满足您的需求

partition = Window.partitionBy("account") \
                  .orderBy(F.col("date").cast("timestamp").cast("long"))

previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
                     .show(10, False)

使用联接的替代解决方案....

df = spark.createDataFrame([
            ("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
            ("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
            ("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
            ["date","account","balance"])

df.alias("current").join(
    df.alias("previous"),
    [F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
    "left"
).select(
    F.col("current.date").alias("date"),
    F.coalesce("current.account", "previous.account").alias("account"),
    F.col("current.balance").alias("balance"),
    (F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()

结果

+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019|      1|     40|                              40|
|01.01.2019|      2|     33|                              33|
|01.01.2019|      3|     31|                              31|
|01.02.2019|      1|     32|                              -8|
|01.02.2019|      2|     56|                              23|
|01.02.2019|      4|     89|                              89|
|01.03.2019|      2|     12|                             -44|
|01.03.2019|      4|     35|                             -54|
|01.03.2019|      5|     76|                              76|
|01.03.2019|      6|     47|                              47|
+----------+-------+-------+--------------------------------+
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
|      date|account|balance|
+----------+-------+-------+
|01.01.2019|      1|     40|
|01.01.2019|      2|     33|
|01.01.2019|      3|     31|
|01.02.2019|      1|     32|
|01.02.2019|      2|     56|
|01.02.2019|      4|     89|
|01.03.2019|      2|     12|
|01.03.2019|      4|     35|
|01.03.2019|      5|     76|
|01.03.2019|      6|     47|
+----------+-------+-------+

>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01|      1|     40|                            40.0|
|2019-01-01|      2|     33|                            33.0|
|2019-01-01|      3|     31|                            31.0|
|2019-02-01|      1|     32|                            -8.0|
|2019-02-01|      2|     56|                            23.0|
|2019-02-01|      4|     89|                            89.0|
|2019-03-01|      2|     12|                           -44.0|
|2019-03-01|      4|     35|                           -54.0|
|2019-03-01|      5|     76|                            76.0|
|2019-03-01|      6|     47|                            47.0|
+----------+-------+-------+--------------------------------+