使用循环通过 SparkSQL 数据框的值创建新列
Creating new column with values of looping through SparkSQL Dataframe
我有 sparkSQL 数据框,其中包含唯一代码、月日和营业额。我想遍历每个月的日期以获得 turnover_per_year 在 12 个月内的营业额总和。例如,如果月日是 2022 年 1 月,则总和将从 2021 年 1 月到 2022 年 1 月。
例如,如果我有 2021 年和 2022 年的数据。 turnover_per_year in january 1st 2022 = turnover Jan 2021+ turnover feb 2021 + turnover march 2021 + ... + turnover dec 2021 + turnover jan 2022
的计算但是如果我想在 2021 年 1 月获得 turnover_per_year 那么它将为空,因为我没有 2020 年的数据。
这是数据帧的样本
+------+------------+----------+
| code | monthdate | turnover |
+------+------------+----------+
| AA1 | 2021-01-01 | 10 |
+------+------------+----------+
| AA1 | 2021-02-01 | 20 |
+------+------------+----------+
| AA1 | 2021-03-01 | 30 |
+------+------------+----------+
| AA1 | 2021-04-01 | 40 |
+------+------------+----------+
| AA1 | 2021-05-01 | 50 |
+------+------------+----------+
| AA1 | 2021-06-01 | 60 |
+------+------------+----------+
| AA1 | 2021-07-01 | 70 |
+------+------------+----------+
| AA1 | 2021-08-01 | 80 |
+------+------------+----------+
| AA1 | 2021-09-01 | 90 |
+------+------------+----------+
| AA1 | 2021-10-01 | 100 |
+------+------------+----------+
| AA1 | 2021-11-01 | 101 |
+------+------------+----------+
| AA1 | 2021-12-01 | 102 |
+------+------------+----------+
| AA1 | 2022-01-01 | 103 |
+------+------------+----------+
| AA1 | 2022-02-01 | 104 |
+------+------------+----------+
| AA1 | 2022-03-01 | 105 |
+------+------------+----------+
| AA1 | 2022-04-01 | 106 |
+------+------------+----------+
| AA1 | 2022-05-01 | 107 |
+------+------------+----------+
| AA1 | 2022-06-01 | 108 |
+------+------------+----------+
| AA1 | 2022-07-01 | 109 |
+------+------------+----------+
| AA1 | 2022-08-01 | 110 |
+------+------------+----------+
| AA1 | 2022-09-01 | 111 |
+------+------------+----------+
| AA1 | 2022-10-01 | 112 |
+------+------------+----------+
| AA1 | 2022-11-01 | 113 |
+------+------------+----------+
| AA1 | 2022-12-01 | 114 |
+------+------------+----------+
我对 spark 和 scala 很陌生,以 spark scala 的方式解决这个问题让我很困惑。我已经开发了逻辑,但很难将其转换为 spark scala。我正在研究集群模式。这是我的逻辑。
listkey = df.select("code").distinct.map(r => r(0)).collect())
listkey.foreach(key=>
df.select(*).filter("code==${key}").oderBy("monthdate").foreach(
row=>
var monthdate = row.monthdate
var turnover = row.turnover
var sum = 0
sum = sum + turnover
var n = 1
var i = 1
while (n<12){
var monthdate_temp = datetime-i
var turnover_temp =
df.select("turnover").filter("monthdate=${monthdate_temp} and code =${key}").collect()
sum = sum+turnover_temp
n =n+1
i = i+1
}
row = row.withColumn("turnover_per_year",sum)
)
)
任何帮助将不胜感激,提前致谢
原始dataframe中的每一行可以通过“explode”函数扩展为12行,并与原始dataframe合并,并分组:
val df = Seq(
("AA1", "2021-01-01", 25),
("AA1", "2022-01-01", 103)
)
.toDF("code", "monthdate", "turnover")
.withColumn("monthdate", to_date($"monthdate", "yyyy-MM-dd"))
val oneYearBackMonths = (0 to 12).map(n => lit(-n))
val explodedWithBackMonths = df
.withColumn("shift", explode(array(oneYearBackMonths: _*)))
.withColumn("rangeMonth",expr("add_months(monthdate, shift)"))
val joinCondition = $"exploded.code" === $"original.code" &&
$"exploded.rangeMonth" === $"original.monthdate"
explodedWithBackMonths.alias("exploded")
.join(df.alias("original"), joinCondition)
.groupBy($"exploded.code", $"exploded.monthdate")
.agg(sum($"original.turnover").alias("oneYearTurnover"))
结果:
+----+----------+---------------+
|code|monthdate |oneYearTurnover|
+----+----------+---------------+
|AA1 |2021-01-01|25 |
|AA1 |2022-01-01|128 |
+----+----------+---------------+
你可以使用 Spark 的 Window 函数
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val raw = Seq(
("AA1", "2019-01-01", 25),
("AA1", "2021-01-01", 25),
("AA1","2021-08-01",80),
("AA1" ,"2021-09-01" , 90 ),
("AA1", "2022-01-01", 103),
("AA2", "2022-01-01", 10)
).toDF("code", "monthdate", "turnover")
val df = raw.withColumn("monthdate",to_timestamp($"monthdate","yyyy-mm-dd"))
val pw = Window.partitionBy($"code").orderBy($"monthdate".cast("long")).rangeBetween(-(86400*365), 0)
df.withColumn("sum",sum($"turnover").over(pw)).show()
+----+-------------------+--------+---+
|code| monthdate|turnover|sum|
+----+-------------------+--------+---+
| AA1|2019-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:08:00| 80|105|
| AA1|2021-01-01 00:09:00| 90|195|
| AA1|2022-01-01 00:01:00| 103|298|
| AA2|2022-01-01 00:01:00| 10| 10|
+----+-------------------+--------+---+
我创建了 2 个 window 函数用于测试,请您检查一下并评论是否可以。
val w = Window.partitionBy($"code")
.orderBy($"rownum")
.rowsBetween(-11, Window.currentRow)
val w1 = Window.partitionBy($"code")
.orderBy($"monthdate")
val newDf = initDf.withColumn("rownum", row_number().over(w1))
.withColumn("csum",sum("turnover").over(w))
We may need to first group by Month & Year and take the sum of the
turnover for that month, then sort by month for that code
我有 sparkSQL 数据框,其中包含唯一代码、月日和营业额。我想遍历每个月的日期以获得 turnover_per_year 在 12 个月内的营业额总和。例如,如果月日是 2022 年 1 月,则总和将从 2021 年 1 月到 2022 年 1 月。
例如,如果我有 2021 年和 2022 年的数据。 turnover_per_year in january 1st 2022 = turnover Jan 2021+ turnover feb 2021 + turnover march 2021 + ... + turnover dec 2021 + turnover jan 2022
的计算但是如果我想在 2021 年 1 月获得 turnover_per_year 那么它将为空,因为我没有 2020 年的数据。
这是数据帧的样本
+------+------------+----------+
| code | monthdate | turnover |
+------+------------+----------+
| AA1 | 2021-01-01 | 10 |
+------+------------+----------+
| AA1 | 2021-02-01 | 20 |
+------+------------+----------+
| AA1 | 2021-03-01 | 30 |
+------+------------+----------+
| AA1 | 2021-04-01 | 40 |
+------+------------+----------+
| AA1 | 2021-05-01 | 50 |
+------+------------+----------+
| AA1 | 2021-06-01 | 60 |
+------+------------+----------+
| AA1 | 2021-07-01 | 70 |
+------+------------+----------+
| AA1 | 2021-08-01 | 80 |
+------+------------+----------+
| AA1 | 2021-09-01 | 90 |
+------+------------+----------+
| AA1 | 2021-10-01 | 100 |
+------+------------+----------+
| AA1 | 2021-11-01 | 101 |
+------+------------+----------+
| AA1 | 2021-12-01 | 102 |
+------+------------+----------+
| AA1 | 2022-01-01 | 103 |
+------+------------+----------+
| AA1 | 2022-02-01 | 104 |
+------+------------+----------+
| AA1 | 2022-03-01 | 105 |
+------+------------+----------+
| AA1 | 2022-04-01 | 106 |
+------+------------+----------+
| AA1 | 2022-05-01 | 107 |
+------+------------+----------+
| AA1 | 2022-06-01 | 108 |
+------+------------+----------+
| AA1 | 2022-07-01 | 109 |
+------+------------+----------+
| AA1 | 2022-08-01 | 110 |
+------+------------+----------+
| AA1 | 2022-09-01 | 111 |
+------+------------+----------+
| AA1 | 2022-10-01 | 112 |
+------+------------+----------+
| AA1 | 2022-11-01 | 113 |
+------+------------+----------+
| AA1 | 2022-12-01 | 114 |
+------+------------+----------+
我对 spark 和 scala 很陌生,以 spark scala 的方式解决这个问题让我很困惑。我已经开发了逻辑,但很难将其转换为 spark scala。我正在研究集群模式。这是我的逻辑。
listkey = df.select("code").distinct.map(r => r(0)).collect())
listkey.foreach(key=>
df.select(*).filter("code==${key}").oderBy("monthdate").foreach(
row=>
var monthdate = row.monthdate
var turnover = row.turnover
var sum = 0
sum = sum + turnover
var n = 1
var i = 1
while (n<12){
var monthdate_temp = datetime-i
var turnover_temp =
df.select("turnover").filter("monthdate=${monthdate_temp} and code =${key}").collect()
sum = sum+turnover_temp
n =n+1
i = i+1
}
row = row.withColumn("turnover_per_year",sum)
)
)
任何帮助将不胜感激,提前致谢
原始dataframe中的每一行可以通过“explode”函数扩展为12行,并与原始dataframe合并,并分组:
val df = Seq(
("AA1", "2021-01-01", 25),
("AA1", "2022-01-01", 103)
)
.toDF("code", "monthdate", "turnover")
.withColumn("monthdate", to_date($"monthdate", "yyyy-MM-dd"))
val oneYearBackMonths = (0 to 12).map(n => lit(-n))
val explodedWithBackMonths = df
.withColumn("shift", explode(array(oneYearBackMonths: _*)))
.withColumn("rangeMonth",expr("add_months(monthdate, shift)"))
val joinCondition = $"exploded.code" === $"original.code" &&
$"exploded.rangeMonth" === $"original.monthdate"
explodedWithBackMonths.alias("exploded")
.join(df.alias("original"), joinCondition)
.groupBy($"exploded.code", $"exploded.monthdate")
.agg(sum($"original.turnover").alias("oneYearTurnover"))
结果:
+----+----------+---------------+
|code|monthdate |oneYearTurnover|
+----+----------+---------------+
|AA1 |2021-01-01|25 |
|AA1 |2022-01-01|128 |
+----+----------+---------------+
你可以使用 Spark 的 Window 函数
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val raw = Seq(
("AA1", "2019-01-01", 25),
("AA1", "2021-01-01", 25),
("AA1","2021-08-01",80),
("AA1" ,"2021-09-01" , 90 ),
("AA1", "2022-01-01", 103),
("AA2", "2022-01-01", 10)
).toDF("code", "monthdate", "turnover")
val df = raw.withColumn("monthdate",to_timestamp($"monthdate","yyyy-mm-dd"))
val pw = Window.partitionBy($"code").orderBy($"monthdate".cast("long")).rangeBetween(-(86400*365), 0)
df.withColumn("sum",sum($"turnover").over(pw)).show()
+----+-------------------+--------+---+
|code| monthdate|turnover|sum|
+----+-------------------+--------+---+
| AA1|2019-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:08:00| 80|105|
| AA1|2021-01-01 00:09:00| 90|195|
| AA1|2022-01-01 00:01:00| 103|298|
| AA2|2022-01-01 00:01:00| 10| 10|
+----+-------------------+--------+---+
我创建了 2 个 window 函数用于测试,请您检查一下并评论是否可以。
val w = Window.partitionBy($"code")
.orderBy($"rownum")
.rowsBetween(-11, Window.currentRow)
val w1 = Window.partitionBy($"code")
.orderBy($"monthdate")
val newDf = initDf.withColumn("rownum", row_number().over(w1))
.withColumn("csum",sum("turnover").over(w))
We may need to first group by Month & Year and take the sum of the turnover for that month, then sort by month for that code