通过对多列进行分组用均值填充缺失值

Filling missing value with mean by grouping multiple columns

描述:" 如何用 Pyspark 中的 condition 和 model 列用平均值填充 price 列中的缺失值?我的 python 代码是这样的:cars['price'] = np.ceil(cars['price'].fillna(cars.groupby(['condition', 'model' ])['price'].transform('mean')))

错误: 我在 Pyspark 中尝试了不同的代码,但每次我都会遇到不同的错误。像这样,代码:cars_new=cars.fillna((cars.groupBy("condition", "model").agg(mean("price"))['avg(price)'])) 错误:

ValueError: value should be a float, int, long, string, bool or dict

DataFrame

enter image description here

可以这样使用 window functions 来完成:

cars_new = cars.fillna(0, subset=['price'])
w = Window().partitionBy('condition', 'model')
cars = cars.withColumn('price',when(col('price').isNull(), avg(col('price')).over(w)).otherwise(col('price')))

不确定您的输入数据是什么样的,但假设我们有一个如下所示的数据框:

+---------+-----+-----+                                                         
|condition|model|price|
+---------+-----+-----+
|A        |A    |1    |
|A        |B    |2    |
|A        |B    |2    |
|A        |A    |1    |
|A        |A    |null |
|B        |A    |3    |
|B        |A    |null |
|B        |B    |4    |
+---------+-----+-----+

我们想用平均值填充 null 但超过 conditionmodel

为此我们可以定义一个Window,计算avg然后替换null.

示例:

from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("test").getOrCreate()
data = [
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "A", "price": None},
    {"condition": "B", "model": "A", "price": 3},
    {"condition": "B", "model": "A", "price": None},
    {"condition": "B", "model": "B", "price": 4},
]

window = Window.partitionBy(["condition", "model"]).orderBy("condition")
df = spark.createDataFrame(data=data)
df = (
    df.withColumn("avg", F.avg("price").over(window))
    .withColumn(
        "price", F.when(F.col("price").isNull(), F.col("avg")).otherwise(F.col("price"))
    )
    .drop("avg")
)

这给了我们:

+---------+-----+-----+
|condition|model|price|
+---------+-----+-----+
|A        |A    |1.0  |
|A        |A    |1.0  |
|A        |A    |1.0  |
|B        |B    |4.0  |
|B        |A    |3.0  |
|B        |A    |3.0  |
|A        |B    |2.0  |
|A        |B    |2.0  |
+---------+-----+-----+