pyspark 每 3 个月的平均销售额

Average of sales every 3 months in pyspark

我想要 pyspark 中每 3 个月的平均销售额。

Input 输入:

Product Date    Sales
A   01/04/2020  50
A   02/04/2020  60
A   01/05/2020  70
A   05/05/2020  80
A   10/06/2020  100
A   13/06/2020  150
A   25/07/2020  160

输出:output

Product Date    Sales   3month Avg sales
A   01/04/2020   50     36.67
A   02/04/2020   60     36.67
A   01/05/2020   70    86.67
A   05/05/2020   80    86.67
A   10/06/2020  100    170
A   13/06/2020  150    170
A   25/07/2020  160    186.67

七月的平均销售额是(五月+六月+七月)/3=560/3=186.67

您可以在月份列上使用 dense_rank() 来计算移动平均线。投射日期并从中提取月份。 dense_rank() 当月排名为您连续排名。

对于移动平均线,您可以使用 rangeBetween(-2, 0) 从当前月份回顾 2 个月。与 sales 相加并除以 3 得到输出。

你的 df:

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.sql.window import Window

row = Row("Product", "Date", "Sales")
df = sc.parallelize([row("A", "01/04/2020", 50),row("A", "02/04/2020", 60),row("A", "01/05/2020", 70),row("A", "05/05/2020", 80),row("A", "10/06/2020", 100),row("A", "13/06/2020", 150),row("A", "25/07/2020", 160)]).toDF()
df = df.withColumn('date_cast', from_unixtime(unix_timestamp('Date', 'dd/MM/yyyy')).cast(DateType()))
df = df.withColumn('month', month("date_cast"))

w=Window().partitionBy("Product").orderBy("month")
df = df.withColumn('rank', F.dense_rank().over(w))

w2 = (Window().partitionBy(col("Product")).orderBy("rank").rangeBetween(-2, 0))

df.select(col("*"), ((F.sum("Sales").over(w2))/3).alias("mean"))\
        .drop("date_cast", "month", "rank").show()

输出:

+-------+----------+-----+------------------+
|Product|      Date|Sales|              mean|
+-------+----------+-----+------------------+
|      A|01/04/2020|   50|36.666666666666664|
|      A|02/04/2020|   60|36.666666666666664|
|      A|01/05/2020|   70| 86.66666666666667|
|      A|05/05/2020|   80| 86.66666666666667|
|      A|10/06/2020|  100|             170.0|
|      A|13/06/2020|  150|             170.0|
|      A|25/07/2020|  160|186.66666666666666|
+-------+----------+-----+------------------+

有时,dense_rank 非常昂贵,所以我已经计算了自定义索引和与@Cena 类似的步骤。

from pyspark.sql import Window
from pyspark.sql.functions import *

w = Window.partitionBy('Product').orderBy('index').rangeBetween(-2, 0)

df.withColumn('Date', to_date('Date', 'dd/MM/yyyy')) \
  .withColumn('index', (year('Date') - 2020) * 12 + month('Date')) \
  .withColumn('avg', sum('Sales').over(w) / 3) \
  .show()

+-------+----------+-----+-----+------------------+
|Product|      Date|Sales|index|               avg|
+-------+----------+-----+-----+------------------+
|      A|2020-04-01|   50|    4|36.666666666666664|
|      A|2020-04-02|   60|    4|36.666666666666664|
|      A|2020-05-01|   70|    5| 86.66666666666667|
|      A|2020-05-05|   80|    5| 86.66666666666667|
|      A|2020-06-10|  100|    6|             170.0|
|      A|2020-06-13|  150|    6|             170.0|
|      A|2020-07-25|  160|    7|186.66666666666666|
+-------+----------+-----+-----+------------------+