pyspark 每 3 个月的平均销售额
Average of sales every 3 months in pyspark
我想要 pyspark 中每 3 个月的平均销售额。
Input
输入:
Product Date Sales
A 01/04/2020 50
A 02/04/2020 60
A 01/05/2020 70
A 05/05/2020 80
A 10/06/2020 100
A 13/06/2020 150
A 25/07/2020 160
输出:output
Product Date Sales 3month Avg sales
A 01/04/2020 50 36.67
A 02/04/2020 60 36.67
A 01/05/2020 70 86.67
A 05/05/2020 80 86.67
A 10/06/2020 100 170
A 13/06/2020 150 170
A 25/07/2020 160 186.67
七月的平均销售额是(五月+六月+七月)/3=560/3=186.67
您可以在月份列上使用 dense_rank() 来计算移动平均线。投射日期并从中提取月份。 dense_rank()
当月排名为您连续排名。
对于移动平均线,您可以使用 rangeBetween(-2, 0)
从当前月份回顾 2 个月。与 sales
相加并除以 3 得到输出。
你的 df:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.sql.window import Window
row = Row("Product", "Date", "Sales")
df = sc.parallelize([row("A", "01/04/2020", 50),row("A", "02/04/2020", 60),row("A", "01/05/2020", 70),row("A", "05/05/2020", 80),row("A", "10/06/2020", 100),row("A", "13/06/2020", 150),row("A", "25/07/2020", 160)]).toDF()
df = df.withColumn('date_cast', from_unixtime(unix_timestamp('Date', 'dd/MM/yyyy')).cast(DateType()))
df = df.withColumn('month', month("date_cast"))
w=Window().partitionBy("Product").orderBy("month")
df = df.withColumn('rank', F.dense_rank().over(w))
w2 = (Window().partitionBy(col("Product")).orderBy("rank").rangeBetween(-2, 0))
df.select(col("*"), ((F.sum("Sales").over(w2))/3).alias("mean"))\
.drop("date_cast", "month", "rank").show()
输出:
+-------+----------+-----+------------------+
|Product| Date|Sales| mean|
+-------+----------+-----+------------------+
| A|01/04/2020| 50|36.666666666666664|
| A|02/04/2020| 60|36.666666666666664|
| A|01/05/2020| 70| 86.66666666666667|
| A|05/05/2020| 80| 86.66666666666667|
| A|10/06/2020| 100| 170.0|
| A|13/06/2020| 150| 170.0|
| A|25/07/2020| 160|186.66666666666666|
+-------+----------+-----+------------------+
有时,dense_rank 非常昂贵,所以我已经计算了自定义索引和与@Cena 类似的步骤。
from pyspark.sql import Window
from pyspark.sql.functions import *
w = Window.partitionBy('Product').orderBy('index').rangeBetween(-2, 0)
df.withColumn('Date', to_date('Date', 'dd/MM/yyyy')) \
.withColumn('index', (year('Date') - 2020) * 12 + month('Date')) \
.withColumn('avg', sum('Sales').over(w) / 3) \
.show()
+-------+----------+-----+-----+------------------+
|Product| Date|Sales|index| avg|
+-------+----------+-----+-----+------------------+
| A|2020-04-01| 50| 4|36.666666666666664|
| A|2020-04-02| 60| 4|36.666666666666664|
| A|2020-05-01| 70| 5| 86.66666666666667|
| A|2020-05-05| 80| 5| 86.66666666666667|
| A|2020-06-10| 100| 6| 170.0|
| A|2020-06-13| 150| 6| 170.0|
| A|2020-07-25| 160| 7|186.66666666666666|
+-------+----------+-----+-----+------------------+
我想要 pyspark 中每 3 个月的平均销售额。
Input 输入:
Product Date Sales
A 01/04/2020 50
A 02/04/2020 60
A 01/05/2020 70
A 05/05/2020 80
A 10/06/2020 100
A 13/06/2020 150
A 25/07/2020 160
输出:output
Product Date Sales 3month Avg sales
A 01/04/2020 50 36.67
A 02/04/2020 60 36.67
A 01/05/2020 70 86.67
A 05/05/2020 80 86.67
A 10/06/2020 100 170
A 13/06/2020 150 170
A 25/07/2020 160 186.67
七月的平均销售额是(五月+六月+七月)/3=560/3=186.67
您可以在月份列上使用 dense_rank() 来计算移动平均线。投射日期并从中提取月份。 dense_rank()
当月排名为您连续排名。
对于移动平均线,您可以使用 rangeBetween(-2, 0)
从当前月份回顾 2 个月。与 sales
相加并除以 3 得到输出。
你的 df:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.sql.window import Window
row = Row("Product", "Date", "Sales")
df = sc.parallelize([row("A", "01/04/2020", 50),row("A", "02/04/2020", 60),row("A", "01/05/2020", 70),row("A", "05/05/2020", 80),row("A", "10/06/2020", 100),row("A", "13/06/2020", 150),row("A", "25/07/2020", 160)]).toDF()
df = df.withColumn('date_cast', from_unixtime(unix_timestamp('Date', 'dd/MM/yyyy')).cast(DateType()))
df = df.withColumn('month', month("date_cast"))
w=Window().partitionBy("Product").orderBy("month")
df = df.withColumn('rank', F.dense_rank().over(w))
w2 = (Window().partitionBy(col("Product")).orderBy("rank").rangeBetween(-2, 0))
df.select(col("*"), ((F.sum("Sales").over(w2))/3).alias("mean"))\
.drop("date_cast", "month", "rank").show()
输出:
+-------+----------+-----+------------------+
|Product| Date|Sales| mean|
+-------+----------+-----+------------------+
| A|01/04/2020| 50|36.666666666666664|
| A|02/04/2020| 60|36.666666666666664|
| A|01/05/2020| 70| 86.66666666666667|
| A|05/05/2020| 80| 86.66666666666667|
| A|10/06/2020| 100| 170.0|
| A|13/06/2020| 150| 170.0|
| A|25/07/2020| 160|186.66666666666666|
+-------+----------+-----+------------------+
有时,dense_rank 非常昂贵,所以我已经计算了自定义索引和与@Cena 类似的步骤。
from pyspark.sql import Window
from pyspark.sql.functions import *
w = Window.partitionBy('Product').orderBy('index').rangeBetween(-2, 0)
df.withColumn('Date', to_date('Date', 'dd/MM/yyyy')) \
.withColumn('index', (year('Date') - 2020) * 12 + month('Date')) \
.withColumn('avg', sum('Sales').over(w) / 3) \
.show()
+-------+----------+-----+-----+------------------+
|Product| Date|Sales|index| avg|
+-------+----------+-----+-----+------------------+
| A|2020-04-01| 50| 4|36.666666666666664|
| A|2020-04-02| 60| 4|36.666666666666664|
| A|2020-05-01| 70| 5| 86.66666666666667|
| A|2020-05-05| 80| 5| 86.66666666666667|
| A|2020-06-10| 100| 6| 170.0|
| A|2020-06-13| 150| 6| 170.0|
| A|2020-07-25| 160| 7|186.66666666666666|
+-------+----------+-----+-----+------------------+