PySpark:根据其他列的聚合,用列模式填充 NA

PySpark: Fill NAs with mode of column based on aggregation of other columns

** 请注意底部的编辑,以及来自 Anna K 的改编解决方案脚本。(谢谢!)**

我有一个包含 4 列的数据框:

# Compute the mode to fill NAs for Item
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
         ('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
         ('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
         (None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()

#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null|   Red|     3|   10|
#|null|   Red|     6|   20|
#|   A| Green|     5|   30|
#|   A|   Red|     3|   10|
#|null| Green|     6|   10|
#|   B| Green|     5|   10|
#|   B|Orange|     6|   20|
#|   A|   Red|     3|   20|
#|   B| Green|     5|   30|
#|null|   Red|     3|   10|
#+----+------+------+-----+

当按 'Weight' 和 'Color' 分组时,我想用 'Item' 列的模式填充空 'Item' 值:

grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
    .withColumn('order', row_number().over(window))\
    .where(col('order') == 1)

grouped.show()

#+----+------+------+-----+-----+
#|Item|Weight| Color|count|order|
#+----+------+------+-----+-----+
#|   B|     6|Orange|    1|    1|
#|   B|     5| Green|    2|    1|
#|   A|     3|   Red|    2|    1|
#+----+------+------+-----+-----+

在这种情况下,所有带有 'Weight' 和 'Color' 6 和 Orange 组合的空 'Item' 值都将被分配 'B'。

所有具有 'Weight' = 5 和 'Color' = 绿色的空 'Item' 值也将被分配 'B'。

空 'Item' 记录 'Weight' = 3 和 'Color' = 红色将分配项目 = A.

我的第一个想法是使用这个新的分组 df 和我原来的 df- 执行某种连接,但它失败了,而且也非常不优雅。有没有更简化的方法来做到这一点?

# Not elegant, and not working...

# Add new empty column to fill NAs
items = items.withColumn('item_weight_impute', lit(None))

# Select columns to include in the join based on weight
items.join(grouped.select('Item','Weight','Color'), ['Item','Weight','Color'], 'left_outer') \
    .withColumn('item_weight_impute', when((col('Item').isNull()), grouped.Item).otherwise(items.Item)) \
    .select('Item','Color','Weight', 'Price','item_weight_impute') \
    .show()

#+----+------+------+-----+------------------+
#|Item| Color|Weight|Price|item_weight_impute|
#+----+------+------+-----+------------------+
#|   B|Orange|     6|   20|                 B|
#|   A| Green|     5|   30|                 A|
#|null|   Red|     3|   10|              null|
#|null|   Red|     3|   10|              null|
#|   B| Green|     5|   30|                 B|
#|   B| Green|     5|   10|                 B|
#|null| Green|     6|   10|              null|
#|   A|   Red|     3|   20|                 A|
#|   A|   Red|     3|   10|                 A|
#|null|   Red|     6|   20|              null|
#+----+------+------+-----+------------------+

编辑!由 Anna K 提供。 这是对答案的一种改编——也许我们想以编程方式估算 > 1 个分类列。在这里,我们根据重量和价格的组合来估算项目和颜色:

# Practice- compute hierarchical modes to fill NAs for Item and Color
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
         ('A', None, 3, 10), (None, None, 6, 10), ('B', 'Green', 5, 10),
         ('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', None, 5, 30),
         (None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null|   Red|     3|   10|
#|null|   Red|     6|   20|
#|   A| Green|     5|   30|
#|   A|  null|     3|   10|
#|null|  null|     6|   10|
#|   B| Green|     5|   10|
#|   B|Orange|     6|   20|
#|   A|   Red|     3|   20|
#|   B|  null|     5|   30|
#|null|   Red|     3|   10|
#+----+------+------+-----+

mode_columns=['Item', 'Color']

# Weight + Price
for item in mode_columns:
    # Count all occurrences of Weight + Price combos
    df1 = (items
       .dropna(subset=[f'{item}'])
       .groupBy(f'{item}', "Weight", "Price")
       .agg(f.count("Price").alias("count")))

    # Reduce df1 to only include those most frequent Weight + Price combos
    df2 = (df1
       .groupBy("Weight", "Price")
       .agg(f.max("count").alias("count")))

    # Join with df of counts to get the {item} mode
    grouped = (df1
           .join(df2, ["Weight", "Price", "count"])
           .withColumnRenamed(f'{item}', f'{item}_fill_value')
           .drop("count"))
    
    #Join with original df
    items = items.join(grouped, ["Weight", "Price"], "left" )
    
    # Coalesce the original and imputed {item} columns
    items = items.withColumn(f'{item}_weight_price_impute', f.coalesce(f'{item}', f'{item}_fill_value'))
    items = items.drop(f'{item}', f'{item}_fill_value')
    items = items.withColumnRenamed(f'{item}_weight_price_impute', f'{item}')
    items.show()
#+------+-----+----+------+
#|Weight|Price|Item| Color|
#+------+-----+----+------+
#|     5|   30|   B| Green|
#|     5|   30|   B| Green|
#|     5|   30|   A| Green|
#|     5|   30|   A| Green|
#|     3|   20|   A|   Red|
#|     6|   10|null|  null|
#|     5|   10|   B| Green|
#|     3|   10|   A|   Red|
#|     3|   10|   A|   Red|
#|     3|   10|   A|   Red|
#|     6|   20|   B|   Red|
#|     6|   20|   B|   Red|
#|     6|   20|   B|Orange|
#|     6|   20|   B|Orange|
#+------+-----+----+------+


你可以使用join(左/右随心所欲),这里提个建议希望对你有所帮助:

!pwd
!export SPARK_HOME="/usr/spark-2.4.4/python/pyspark/"

import findspark
findspark.init()
from pyspark.sql import SparkSession 
spark =  SparkSession.builder \
    .master("local") \
     .appName("null-fill") \
 .getOrCreate()
   
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
         ('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
         ('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
         (None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()

/usr/spark-2.4.4/work_dir/notebooks
+----+------+------+-----+
|Item| Color|Weight|Price|
+----+------+------+-----+
|null|   Red|     3|   10|
|null|   Red|     6|   20|
|   A| Green|     5|   30|
|   A|   Red|     3|   10|
|null| Green|     6|   10|
|   B| Green|     5|   10|
|   B|Orange|     6|   20|
|   A|   Red|     3|   20|
|   B| Green|     5|   30|
|null|   Red|     3|   10|
+----+------+------+-----+
from  pyspark.sql import Window
from pyspark.sql.functions import *
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
    .withColumn('order', row_number().over(window))\
    .where(col('order') == 1)

grouped.show()

|Item|Weight| Color|count|order|
+----+------+------+-----+-----+
|   B|     6|Orange|    1|    1|
|   B|     5| Green|    2|    1|
|   A|     3|   Red|    2|    1|
+----+------+------+-----+-----+

然后我们根据条件“weight”应用连接:

left_join = grouped.alias('tr').join(items, (grouped.Weight == items.Weight),how='left')
df_edited = left_join.select("tr.Item","tr.Weight","tr.Color","tr.count","tr.order","Price") 
df_edited.show()
+----+------+------+-----+-----+-----+
|Item|Weight| Color|count|order|Price|
+----+------+------+-----+-----+-----+
|   B|     6|Orange|    1|    1|   20|
|   B|     6|Orange|    1|    1|   10|
|   B|     6|Orange|    1|    1|   20|
|   B|     5| Green|    2|    1|   30|
|   B|     5| Green|    2|    1|   10|
|   B|     5| Green|    2|    1|   30|
|   A|     3|   Red|    2|    1|   10|
|   A|     3|   Red|    2|    1|   10|
|   A|     3|   Red|    2|    1|   20|
|   A|     3|   Red|    2|    1|   10|
+----+------+------+-----+-----+-----+

你几乎擅长你所尝试的。这是一个有效的(修复了连接的最后一步):

from pyspark.sql import Window
from pyspark.sql import functions as F


w = Window.partitionBy('Weight', 'Color')

grouped = items.filter("Item is not null") \
    .withColumn('count', F.count("*").over(w)) \
    .withColumn('rn', F.row_number().over(w.orderBy(F.desc("count")))) \
    .where("rn = 1") \
    .select('Item', 'Weight', 'Color')

items.alias("item").join(grouped.alias("mode"), ['Weight', 'Color'], "left") \
    .select("item.*", F.coalesce("item.Item", "mode.Item").alias("item_weight_impute")) \
    .show()

#+------+------+----+-----+------------------+
#|Weight| Color|Item|Price|item_weight_impute|
#+------+------+----+-----+------------------+
#|     6|Orange|   B|   20|                 B|
#|     3|   Red|null|   10|                 A|
#|     3|   Red|   A|   10|                 A|
#|     3|   Red|   A|   20|                 A|
#|     3|   Red|null|   10|                 A|
#|     6| Green|null|   10|              null|
#|     5| Green|   A|   30|                 A|
#|     5| Green|   B|   10|                 B|
#|     5| Green|   B|   30|                 B|
#|     6|   Red|null|   20|              null|
#+------+------+----+-----+------------------+

一旦你 grouped,你可以对 itemsgrouped 进行左连接, 然后使用 coalesce 在 Item 列中填充空值。合并函数 returns 第一列不为空。

步骤 1. 分组 df:

df1 = (items
   .dropna()
   .groupBy("Item", "Weight", "Color")
   .agg(F.count("Price").alias("count")))

df2 = (df1
   .groupBy("Weight", "Color")
   .agg(F.max("count").alias("count")))

grouped = (df1
       .join(df2, ["Weight", "Color", "count"])
       .withColumnRenamed("Item", "fill_value")
       .drop("count"))

第 2 步。左连接项目与分组

df = items.join(grouped, ["Weight", "Color"], "left" )

第 3 步。应用合并

df_filled = df.withColumn("item_weight_impute", F.coalesce("Item", "fill_value"))

df_filled 现在是