在 pyspark 中,是否可以使用 1 个 groupBy 进行 2 次聚合?
in pyspark, is it possible to do 2 aggregations using 1 groupBy?
我想知道以下是否允许使用 pyspark:
假设以下 df:
|model | year | price | mileage |
+++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 |
|Galaxy | 2017 | 29395 |11892 |
|Novato | 2018 | 35644 |22876 |
|Novato | 2018 | 8765 |54817 |
df.groupBy('model', 'year')\
.agg({'price':'sum'})\
.agg({'mileage':sum'})\
.withColumnRenamed('sum(price)', 'total_prices')\
.withColumnRenamed('sum(mileage)', 'total_miles')
希望结果是
|model | year | price | mileage | total_prices| total_miles|
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 | 57236 | 29421 |
|Galaxy | 2017 | 29395 |11892 | 57236 | 29421 |
|Novato | 2018 | 35644 |22876 | 44409 | 77693 |
|Novato | 2018 | 8765 |54817 | 44409 | 77693 |
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
使用pandas udf,你可以获得任何聚合数
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,StructType,StructField,StringType
import pandas as pd
agg_schema = StructType(
[StructField("model", StringType(), True),
StructField("year", IntegerType(), True),
StructField("price", IntegerType(), True),
StructField("mileage", IntegerType(), True),
StructField("total_prices", IntegerType(), True),
StructField("total_miles", IntegerType(), True)
]
)
@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg(pdf):
total_prices = pdf['price'].sum()
total_miles = pdf['mileage'].sum()
pdf['total_prices'] = total_prices
pdf['total_miles'] = total_miles
return pdf
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df.groupBy('model','year').apply(agg).show()
结果是
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+
您实际上并不是在寻找 groupby,您在寻找 window function or a join 因为您想使用聚合值扩展您的行。
Window:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
w = Window.partitionBy('model', 'year')
df = df.withColumn('total_prices', F.sum('price').over(w))
df = df.withColumn('total_miles', F.sum('mileage').over(w))
df.show()
加入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df = df.join(df.groupby('model', 'year').agg(F.sum('price').alias('total_price'), F.sum('mileage').alias('total_miles')), ['model', 'year'])
df.show()
输出:
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+
我想知道以下是否允许使用 pyspark: 假设以下 df:
|model | year | price | mileage |
+++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 |
|Galaxy | 2017 | 29395 |11892 |
|Novato | 2018 | 35644 |22876 |
|Novato | 2018 | 8765 |54817 |
df.groupBy('model', 'year')\
.agg({'price':'sum'})\
.agg({'mileage':sum'})\
.withColumnRenamed('sum(price)', 'total_prices')\
.withColumnRenamed('sum(mileage)', 'total_miles')
希望结果是
|model | year | price | mileage | total_prices| total_miles|
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 | 57236 | 29421 |
|Galaxy | 2017 | 29395 |11892 | 57236 | 29421 |
|Novato | 2018 | 35644 |22876 | 44409 | 77693 |
|Novato | 2018 | 8765 |54817 | 44409 | 77693 |
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
使用pandas udf,你可以获得任何聚合数
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,StructType,StructField,StringType
import pandas as pd
agg_schema = StructType(
[StructField("model", StringType(), True),
StructField("year", IntegerType(), True),
StructField("price", IntegerType(), True),
StructField("mileage", IntegerType(), True),
StructField("total_prices", IntegerType(), True),
StructField("total_miles", IntegerType(), True)
]
)
@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg(pdf):
total_prices = pdf['price'].sum()
total_miles = pdf['mileage'].sum()
pdf['total_prices'] = total_prices
pdf['total_miles'] = total_miles
return pdf
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df.groupBy('model','year').apply(agg).show()
结果是
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+
您实际上并不是在寻找 groupby,您在寻找 window function or a join 因为您想使用聚合值扩展您的行。
Window:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
w = Window.partitionBy('model', 'year')
df = df.withColumn('total_prices', F.sum('price').over(w))
df = df.withColumn('total_miles', F.sum('mileage').over(w))
df.show()
加入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df = df.join(df.groupby('model', 'year').agg(F.sum('price').alias('total_price'), F.sum('mileage').alias('total_miles')), ['model', 'year'])
df.show()
输出:
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+