基于当前行创建新列,计算涉及其他行
Create new column based on current row with calculations involving other rows
假设我有这样的数据框:
id,month,price
1,2021-04-31,9
1,2021-01-31,5
1,2021-02-31,6
1,2021-03-31,8
因此,对于每个相同的 ID
,我想为当前行 month-1
和 -2
获取 price
的总和
例如,对于第 1,march,8
行,我将在新列中得到 5+6=11
的输出,因为过去两个月的当前行军行是 jan
和 feb
主数据中还会有其他ids
将 month
名称转换为月份数字,然后将其用于 Window 中的排序,按 id
分区,得到 运行 总和:
from pyspark.sql import functions as F, Window
df = spark.createDataFrame([
(1, "apr", 9), (1, "jan", 5),
(1, "feb", 6), (1, "march", 8)
], ["id", "month", "price"])
# handle both full and short textual representation of month names
month_number = F.when(F.length("month") == 3, F.month(F.to_date(F.col("month"), "MMM"))) \
.otherwise(F.month(F.to_date(F.col("month"), "MMMM")))
w = Window.partitionBy("id").orderBy(month_number).rangeBetween(-2, -1)
df.withColumn("price_sum", F.sum("price").over(w)).show()
#+---+-----+-----+---------+
#| id|month|price|price_sum|
#+---+-----+-----+---------+
#| 1| jan| 5| null|
#| 1| feb| 6| 5|
#| 1|march| 8| 11|
#| 1| apr| 9| 14|
#+---+-----+-----+---------+
对于更新后的问题,您可以将日期截断为月单位,然后使用 window,范围在 interval -2 months
和 interval -1 months
之间:
df = spark.createDataFrame([
(1, "2021-04-30", 9), (1, "2021-01-31", 5),
(1, "2021-02-28", 6), (1, "2021-03-31", 8)
], ["id", "month", "price"])
df.withColumn(
"date",
F.date_trunc("month", F.col("month"))
).withColumn(
"price_sum",
F.expr("""sum(price) over(partition by id order by date
range between interval 2 months preceding
and interval 1 months preceding)
""")
).drop("date").show()
使用 window 函数对价格求和,按月份(数字)进行分区,并使用 ROWS 获取前两行。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("header",True).csv("path/to/file") # Assuming file is csv
df.createOrReplaceTempView('df')
df1 = spark.sql("""
SELECT id,month,price,
CASE
WHEN month = 'jan' THEN 1
WHEN month = 'feb' THEN 2
.
.
.
else 12
END AS month_num
FROM df
""")
df1.createOrReplaceTempView('df1')
spark.sql("""
SELECT id, month, price,
SUM(price) OVER (PARTITION BY id ORDER BY month_num ROWS 2 PRECEDING) AS price_sum
FROM df1
""").show()
添加到第二个查询
WHERE month_num NOT IN (1,2)
如果你想要 price_sum 1 月和 2 月的 0
假设我有这样的数据框:
id,month,price
1,2021-04-31,9
1,2021-01-31,5
1,2021-02-31,6
1,2021-03-31,8
因此,对于每个相同的 ID
,我想为当前行 month-1
和 -2
price
的总和
例如,对于第 1,march,8
行,我将在新列中得到 5+6=11
的输出,因为过去两个月的当前行军行是 jan
和 feb
主数据中还会有其他ids
将 month
名称转换为月份数字,然后将其用于 Window 中的排序,按 id
分区,得到 运行 总和:
from pyspark.sql import functions as F, Window
df = spark.createDataFrame([
(1, "apr", 9), (1, "jan", 5),
(1, "feb", 6), (1, "march", 8)
], ["id", "month", "price"])
# handle both full and short textual representation of month names
month_number = F.when(F.length("month") == 3, F.month(F.to_date(F.col("month"), "MMM"))) \
.otherwise(F.month(F.to_date(F.col("month"), "MMMM")))
w = Window.partitionBy("id").orderBy(month_number).rangeBetween(-2, -1)
df.withColumn("price_sum", F.sum("price").over(w)).show()
#+---+-----+-----+---------+
#| id|month|price|price_sum|
#+---+-----+-----+---------+
#| 1| jan| 5| null|
#| 1| feb| 6| 5|
#| 1|march| 8| 11|
#| 1| apr| 9| 14|
#+---+-----+-----+---------+
对于更新后的问题,您可以将日期截断为月单位,然后使用 window,范围在 interval -2 months
和 interval -1 months
之间:
df = spark.createDataFrame([
(1, "2021-04-30", 9), (1, "2021-01-31", 5),
(1, "2021-02-28", 6), (1, "2021-03-31", 8)
], ["id", "month", "price"])
df.withColumn(
"date",
F.date_trunc("month", F.col("month"))
).withColumn(
"price_sum",
F.expr("""sum(price) over(partition by id order by date
range between interval 2 months preceding
and interval 1 months preceding)
""")
).drop("date").show()
使用 window 函数对价格求和,按月份(数字)进行分区,并使用 ROWS 获取前两行。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("header",True).csv("path/to/file") # Assuming file is csv
df.createOrReplaceTempView('df')
df1 = spark.sql("""
SELECT id,month,price,
CASE
WHEN month = 'jan' THEN 1
WHEN month = 'feb' THEN 2
.
.
.
else 12
END AS month_num
FROM df
""")
df1.createOrReplaceTempView('df1')
spark.sql("""
SELECT id, month, price,
SUM(price) OVER (PARTITION BY id ORDER BY month_num ROWS 2 PRECEDING) AS price_sum
FROM df1
""").show()
添加到第二个查询
WHERE month_num NOT IN (1,2)
如果你想要 price_sum 1 月和 2 月的 0