使用循环通过 SparkSQL 数据框的值创建新列

Creating new column with values of looping through SparkSQL Dataframe

我有 sparkSQL 数据框,其中包含唯一代码、月日和营业额。我想遍历每个月的日期以获得 turnover_per_year 在 12 个月内的营业额总和。例如,如果月日是 2022 年 1 月,则总和将从 2021 年 1 月到 2022 年 1 月。 例如,如果我有 2021 年和 2022 年的数据。 turnover_per_year in january 1st 2022 = turnover Jan 2021+ turnover feb 2021 + turnover march 2021 + ... + turnover dec 2021 + turnover jan 2022 的计算但是如果我想在 2021 年 1 月获得 turnover_per_year 那么它将为空,因为我没有 2020 年的数据。

这是数据帧的样本

+------+------------+----------+
| code | monthdate  | turnover |
+------+------------+----------+
| AA1  | 2021-01-01 | 10       |
+------+------------+----------+
| AA1  | 2021-02-01 | 20       |
+------+------------+----------+
| AA1  | 2021-03-01 | 30       |
+------+------------+----------+
| AA1  | 2021-04-01 | 40       |
+------+------------+----------+
| AA1  | 2021-05-01 | 50       |
+------+------------+----------+
| AA1  | 2021-06-01 | 60       |
+------+------------+----------+
| AA1  | 2021-07-01 | 70       |
+------+------------+----------+
| AA1  | 2021-08-01 | 80       |
+------+------------+----------+
| AA1  | 2021-09-01 | 90       |
+------+------------+----------+
| AA1  | 2021-10-01 | 100      |
+------+------------+----------+
| AA1  | 2021-11-01 | 101      |
+------+------------+----------+
| AA1  | 2021-12-01 | 102      |
+------+------------+----------+
| AA1  | 2022-01-01 | 103      |
+------+------------+----------+
| AA1  | 2022-02-01 | 104      |
+------+------------+----------+
| AA1  | 2022-03-01 | 105      |
+------+------------+----------+
| AA1  | 2022-04-01 | 106      |
+------+------------+----------+
| AA1  | 2022-05-01 | 107      |
+------+------------+----------+
| AA1  | 2022-06-01 | 108      |
+------+------------+----------+
| AA1  | 2022-07-01 | 109      |
+------+------------+----------+
| AA1  | 2022-08-01 | 110      |
+------+------------+----------+
| AA1  | 2022-09-01 | 111      |
+------+------------+----------+
| AA1  | 2022-10-01 | 112      |
+------+------------+----------+
| AA1  | 2022-11-01 | 113      |
+------+------------+----------+
| AA1  | 2022-12-01 | 114      |
+------+------------+----------+

我对 spark 和 scala 很陌生,以 spark scala 的方式解决这个问题让我很困惑。我已经开发了逻辑,但很难将其转换为 spark scala。我正在研究集群模式。这是我的逻辑。

listkey = df.select("code").distinct.map(r => r(0)).collect())
listkey.foreach(key=>
     df.select(*).filter("code==${key}").oderBy("monthdate").foreach(
        row=>
        var monthdate = row.monthdate
        var turnover = row.turnover
        var sum = 0
        sum = sum + turnover
        var n = 1
        var i = 1
        while (n<12){
               var monthdate_temp = datetime-i
               var turnover_temp =
               df.select("turnover").filter("monthdate=${monthdate_temp} and code =${key}").collect()
               sum = sum+turnover_temp
               n =n+1
               i = i+1
         }
         row = row.withColumn("turnover_per_year",sum)
    )
)

任何帮助将不胜感激,提前致谢

原始dataframe中的每一行可以通过“explode”函数扩展为12行,并与原始dataframe合并,并分组:

val df = Seq(
  ("AA1", "2021-01-01", 25),
  ("AA1", "2022-01-01", 103)
)
  .toDF("code", "monthdate", "turnover")
  .withColumn("monthdate", to_date($"monthdate", "yyyy-MM-dd"))

val oneYearBackMonths = (0 to 12).map(n => lit(-n))

val explodedWithBackMonths = df
  .withColumn("shift", explode(array(oneYearBackMonths: _*)))
  .withColumn("rangeMonth",expr("add_months(monthdate, shift)"))

val joinCondition = $"exploded.code" === $"original.code" &&
  $"exploded.rangeMonth" === $"original.monthdate"

explodedWithBackMonths.alias("exploded")
  .join(df.alias("original"), joinCondition)
  .groupBy($"exploded.code", $"exploded.monthdate")
  .agg(sum($"original.turnover").alias("oneYearTurnover"))

结果:

+----+----------+---------------+
|code|monthdate |oneYearTurnover|
+----+----------+---------------+
|AA1 |2021-01-01|25             |
|AA1 |2022-01-01|128            |
+----+----------+---------------+

你可以使用 Spark 的 Window 函数

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val raw = Seq(
  ("AA1", "2019-01-01", 25),
  ("AA1", "2021-01-01", 25),
  ("AA1","2021-08-01",80),
  ("AA1" ,"2021-09-01" , 90 ),
  ("AA1", "2022-01-01", 103),
  ("AA2", "2022-01-01", 10)
).toDF("code", "monthdate", "turnover")
val df = raw.withColumn("monthdate",to_timestamp($"monthdate","yyyy-mm-dd"))
val pw = Window.partitionBy($"code").orderBy($"monthdate".cast("long")).rangeBetween(-(86400*365), 0)
df.withColumn("sum",sum($"turnover").over(pw)).show()

+----+-------------------+--------+---+
|code|          monthdate|turnover|sum|
+----+-------------------+--------+---+
| AA1|2019-01-01 00:01:00|      25| 25|
| AA1|2021-01-01 00:01:00|      25| 25|
| AA1|2021-01-01 00:08:00|      80|105|
| AA1|2021-01-01 00:09:00|      90|195|
| AA1|2022-01-01 00:01:00|     103|298|
| AA2|2022-01-01 00:01:00|      10| 10|
+----+-------------------+--------+---+

我创建了 2 个 window 函数用于测试,请您检查一下并评论是否可以。

val w = Window.partitionBy($"code")
  .orderBy($"rownum")
  .rowsBetween(-11, Window.currentRow)
val w1  = Window.partitionBy($"code")
  .orderBy($"monthdate")
val newDf = initDf.withColumn("rownum", row_number().over(w1))
            .withColumn("csum",sum("turnover").over(w))

We may need to first group by Month & Year and take the sum of the turnover for that month, then sort by month for that code