PySpark:根据其他列的聚合,用列模式填充 NA
PySpark: Fill NAs with mode of column based on aggregation of other columns
** 请注意底部的编辑,以及来自 Anna K 的改编解决方案脚本。(谢谢!)**
我有一个包含 4 列的数据框:
# Compute the mode to fill NAs for Item
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| Red| 3| 10|
#|null| Green| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| Green| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
当按 'Weight' 和 'Color' 分组时,我想用 'Item' 列的模式填充空 'Item' 值:
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
.withColumn('order', row_number().over(window))\
.where(col('order') == 1)
grouped.show()
#+----+------+------+-----+-----+
#|Item|Weight| Color|count|order|
#+----+------+------+-----+-----+
#| B| 6|Orange| 1| 1|
#| B| 5| Green| 2| 1|
#| A| 3| Red| 2| 1|
#+----+------+------+-----+-----+
在这种情况下,所有带有 'Weight' 和 'Color' 6 和 Orange 组合的空 'Item' 值都将被分配 'B'。
所有具有 'Weight' = 5 和 'Color' = 绿色的空 'Item' 值也将被分配 'B'。
空 'Item' 记录 'Weight' = 3 和 'Color' = 红色将分配项目 = A.
我的第一个想法是使用这个新的分组 df 和我原来的 df- 执行某种连接,但它失败了,而且也非常不优雅。有没有更简化的方法来做到这一点?
# Not elegant, and not working...
# Add new empty column to fill NAs
items = items.withColumn('item_weight_impute', lit(None))
# Select columns to include in the join based on weight
items.join(grouped.select('Item','Weight','Color'), ['Item','Weight','Color'], 'left_outer') \
.withColumn('item_weight_impute', when((col('Item').isNull()), grouped.Item).otherwise(items.Item)) \
.select('Item','Color','Weight', 'Price','item_weight_impute') \
.show()
#+----+------+------+-----+------------------+
#|Item| Color|Weight|Price|item_weight_impute|
#+----+------+------+-----+------------------+
#| B|Orange| 6| 20| B|
#| A| Green| 5| 30| A|
#|null| Red| 3| 10| null|
#|null| Red| 3| 10| null|
#| B| Green| 5| 30| B|
#| B| Green| 5| 10| B|
#|null| Green| 6| 10| null|
#| A| Red| 3| 20| A|
#| A| Red| 3| 10| A|
#|null| Red| 6| 20| null|
#+----+------+------+-----+------------------+
编辑!由 Anna K 提供。
这是对答案的一种改编——也许我们想以编程方式估算 > 1 个分类列。在这里,我们根据重量和价格的组合来估算项目和颜色:
# Practice- compute hierarchical modes to fill NAs for Item and Color
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', None, 3, 10), (None, None, 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', None, 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| null| 3| 10|
#|null| null| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| null| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
mode_columns=['Item', 'Color']
# Weight + Price
for item in mode_columns:
# Count all occurrences of Weight + Price combos
df1 = (items
.dropna(subset=[f'{item}'])
.groupBy(f'{item}', "Weight", "Price")
.agg(f.count("Price").alias("count")))
# Reduce df1 to only include those most frequent Weight + Price combos
df2 = (df1
.groupBy("Weight", "Price")
.agg(f.max("count").alias("count")))
# Join with df of counts to get the {item} mode
grouped = (df1
.join(df2, ["Weight", "Price", "count"])
.withColumnRenamed(f'{item}', f'{item}_fill_value')
.drop("count"))
#Join with original df
items = items.join(grouped, ["Weight", "Price"], "left" )
# Coalesce the original and imputed {item} columns
items = items.withColumn(f'{item}_weight_price_impute', f.coalesce(f'{item}', f'{item}_fill_value'))
items = items.drop(f'{item}', f'{item}_fill_value')
items = items.withColumnRenamed(f'{item}_weight_price_impute', f'{item}')
items.show()
#+------+-----+----+------+
#|Weight|Price|Item| Color|
#+------+-----+----+------+
#| 5| 30| B| Green|
#| 5| 30| B| Green|
#| 5| 30| A| Green|
#| 5| 30| A| Green|
#| 3| 20| A| Red|
#| 6| 10|null| null|
#| 5| 10| B| Green|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 6| 20| B| Red|
#| 6| 20| B| Red|
#| 6| 20| B|Orange|
#| 6| 20| B|Orange|
#+------+-----+----+------+
你可以使用join(左/右随心所欲),这里提个建议希望对你有所帮助:
!pwd
!export SPARK_HOME="/usr/spark-2.4.4/python/pyspark/"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("null-fill") \
.getOrCreate()
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
/usr/spark-2.4.4/work_dir/notebooks
+----+------+------+-----+
|Item| Color|Weight|Price|
+----+------+------+-----+
|null| Red| 3| 10|
|null| Red| 6| 20|
| A| Green| 5| 30|
| A| Red| 3| 10|
|null| Green| 6| 10|
| B| Green| 5| 10|
| B|Orange| 6| 20|
| A| Red| 3| 20|
| B| Green| 5| 30|
|null| Red| 3| 10|
+----+------+------+-----+
from pyspark.sql import Window
from pyspark.sql.functions import *
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
.withColumn('order', row_number().over(window))\
.where(col('order') == 1)
grouped.show()
|Item|Weight| Color|count|order|
+----+------+------+-----+-----+
| B| 6|Orange| 1| 1|
| B| 5| Green| 2| 1|
| A| 3| Red| 2| 1|
+----+------+------+-----+-----+
然后我们根据条件“weight”应用连接:
left_join = grouped.alias('tr').join(items, (grouped.Weight == items.Weight),how='left')
df_edited = left_join.select("tr.Item","tr.Weight","tr.Color","tr.count","tr.order","Price")
df_edited.show()
+----+------+------+-----+-----+-----+
|Item|Weight| Color|count|order|Price|
+----+------+------+-----+-----+-----+
| B| 6|Orange| 1| 1| 20|
| B| 6|Orange| 1| 1| 10|
| B| 6|Orange| 1| 1| 20|
| B| 5| Green| 2| 1| 30|
| B| 5| Green| 2| 1| 10|
| B| 5| Green| 2| 1| 30|
| A| 3| Red| 2| 1| 10|
| A| 3| Red| 2| 1| 10|
| A| 3| Red| 2| 1| 20|
| A| 3| Red| 2| 1| 10|
+----+------+------+-----+-----+-----+
你几乎擅长你所尝试的。这是一个有效的(修复了连接的最后一步):
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy('Weight', 'Color')
grouped = items.filter("Item is not null") \
.withColumn('count', F.count("*").over(w)) \
.withColumn('rn', F.row_number().over(w.orderBy(F.desc("count")))) \
.where("rn = 1") \
.select('Item', 'Weight', 'Color')
items.alias("item").join(grouped.alias("mode"), ['Weight', 'Color'], "left") \
.select("item.*", F.coalesce("item.Item", "mode.Item").alias("item_weight_impute")) \
.show()
#+------+------+----+-----+------------------+
#|Weight| Color|Item|Price|item_weight_impute|
#+------+------+----+-----+------------------+
#| 6|Orange| B| 20| B|
#| 3| Red|null| 10| A|
#| 3| Red| A| 10| A|
#| 3| Red| A| 20| A|
#| 3| Red|null| 10| A|
#| 6| Green|null| 10| null|
#| 5| Green| A| 30| A|
#| 5| Green| B| 10| B|
#| 5| Green| B| 30| B|
#| 6| Red|null| 20| null|
#+------+------+----+-----+------------------+
一旦你 grouped,你可以对 items 和 grouped 进行左连接, 然后使用 coalesce 在 Item 列中填充空值。合并函数 returns 第一列不为空。
步骤 1. 分组 df:
df1 = (items
.dropna()
.groupBy("Item", "Weight", "Color")
.agg(F.count("Price").alias("count")))
df2 = (df1
.groupBy("Weight", "Color")
.agg(F.max("count").alias("count")))
grouped = (df1
.join(df2, ["Weight", "Color", "count"])
.withColumnRenamed("Item", "fill_value")
.drop("count"))
第 2 步。左连接项目与分组
df = items.join(grouped, ["Weight", "Color"], "left" )
第 3 步。应用合并
df_filled = df.withColumn("item_weight_impute", F.coalesce("Item", "fill_value"))
df_filled 现在是
** 请注意底部的编辑,以及来自 Anna K 的改编解决方案脚本。(谢谢!)**
我有一个包含 4 列的数据框:
# Compute the mode to fill NAs for Item
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| Red| 3| 10|
#|null| Green| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| Green| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
当按 'Weight' 和 'Color' 分组时,我想用 'Item' 列的模式填充空 'Item' 值:
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
.withColumn('order', row_number().over(window))\
.where(col('order') == 1)
grouped.show()
#+----+------+------+-----+-----+
#|Item|Weight| Color|count|order|
#+----+------+------+-----+-----+
#| B| 6|Orange| 1| 1|
#| B| 5| Green| 2| 1|
#| A| 3| Red| 2| 1|
#+----+------+------+-----+-----+
在这种情况下,所有带有 'Weight' 和 'Color' 6 和 Orange 组合的空 'Item' 值都将被分配 'B'。
所有具有 'Weight' = 5 和 'Color' = 绿色的空 'Item' 值也将被分配 'B'。
空 'Item' 记录 'Weight' = 3 和 'Color' = 红色将分配项目 = A.
我的第一个想法是使用这个新的分组 df 和我原来的 df- 执行某种连接,但它失败了,而且也非常不优雅。有没有更简化的方法来做到这一点?
# Not elegant, and not working...
# Add new empty column to fill NAs
items = items.withColumn('item_weight_impute', lit(None))
# Select columns to include in the join based on weight
items.join(grouped.select('Item','Weight','Color'), ['Item','Weight','Color'], 'left_outer') \
.withColumn('item_weight_impute', when((col('Item').isNull()), grouped.Item).otherwise(items.Item)) \
.select('Item','Color','Weight', 'Price','item_weight_impute') \
.show()
#+----+------+------+-----+------------------+
#|Item| Color|Weight|Price|item_weight_impute|
#+----+------+------+-----+------------------+
#| B|Orange| 6| 20| B|
#| A| Green| 5| 30| A|
#|null| Red| 3| 10| null|
#|null| Red| 3| 10| null|
#| B| Green| 5| 30| B|
#| B| Green| 5| 10| B|
#|null| Green| 6| 10| null|
#| A| Red| 3| 20| A|
#| A| Red| 3| 10| A|
#|null| Red| 6| 20| null|
#+----+------+------+-----+------------------+
编辑!由 Anna K 提供。 这是对答案的一种改编——也许我们想以编程方式估算 > 1 个分类列。在这里,我们根据重量和价格的组合来估算项目和颜色:
# Practice- compute hierarchical modes to fill NAs for Item and Color
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', None, 3, 10), (None, None, 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', None, 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
#+----+------+------+-----+
#|Item| Color|Weight|Price|
#+----+------+------+-----+
#|null| Red| 3| 10|
#|null| Red| 6| 20|
#| A| Green| 5| 30|
#| A| null| 3| 10|
#|null| null| 6| 10|
#| B| Green| 5| 10|
#| B|Orange| 6| 20|
#| A| Red| 3| 20|
#| B| null| 5| 30|
#|null| Red| 3| 10|
#+----+------+------+-----+
mode_columns=['Item', 'Color']
# Weight + Price
for item in mode_columns:
# Count all occurrences of Weight + Price combos
df1 = (items
.dropna(subset=[f'{item}'])
.groupBy(f'{item}', "Weight", "Price")
.agg(f.count("Price").alias("count")))
# Reduce df1 to only include those most frequent Weight + Price combos
df2 = (df1
.groupBy("Weight", "Price")
.agg(f.max("count").alias("count")))
# Join with df of counts to get the {item} mode
grouped = (df1
.join(df2, ["Weight", "Price", "count"])
.withColumnRenamed(f'{item}', f'{item}_fill_value')
.drop("count"))
#Join with original df
items = items.join(grouped, ["Weight", "Price"], "left" )
# Coalesce the original and imputed {item} columns
items = items.withColumn(f'{item}_weight_price_impute', f.coalesce(f'{item}', f'{item}_fill_value'))
items = items.drop(f'{item}', f'{item}_fill_value')
items = items.withColumnRenamed(f'{item}_weight_price_impute', f'{item}')
items.show()
#+------+-----+----+------+
#|Weight|Price|Item| Color|
#+------+-----+----+------+
#| 5| 30| B| Green|
#| 5| 30| B| Green|
#| 5| 30| A| Green|
#| 5| 30| A| Green|
#| 3| 20| A| Red|
#| 6| 10|null| null|
#| 5| 10| B| Green|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 3| 10| A| Red|
#| 6| 20| B| Red|
#| 6| 20| B| Red|
#| 6| 20| B|Orange|
#| 6| 20| B|Orange|
#+------+-----+----+------+
你可以使用join(左/右随心所欲),这里提个建议希望对你有所帮助:
!pwd
!export SPARK_HOME="/usr/spark-2.4.4/python/pyspark/"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("null-fill") \
.getOrCreate()
values = [(None, 'Red', 3, 10), (None, 'Red', 6, 20), ('A', 'Green', 5, 30),
('A', 'Red', 3, 10), (None, 'Green', 6, 10), ('B', 'Green', 5, 10),
('B', 'Orange', 6, 20), ('A', 'Red', 3, 20), ('B', 'Green', 5, 30),
(None, 'Red', 3, 10)]
items = spark.createDataFrame(values, ['Item', 'Color', 'Weight', 'Price'])
items.show()
/usr/spark-2.4.4/work_dir/notebooks
+----+------+------+-----+
|Item| Color|Weight|Price|
+----+------+------+-----+
|null| Red| 3| 10|
|null| Red| 6| 20|
| A| Green| 5| 30|
| A| Red| 3| 10|
|null| Green| 6| 10|
| B| Green| 5| 10|
| B|Orange| 6| 20|
| A| Red| 3| 20|
| B| Green| 5| 30|
|null| Red| 3| 10|
+----+------+------+-----+
from pyspark.sql import Window
from pyspark.sql.functions import *
grouped = items.where(items['Item'].isNotNull()).groupBy('Item', 'Weight', 'Color').count()
window = Window.partitionBy('Weight').orderBy(desc("count"))
grouped = grouped\
.withColumn('order', row_number().over(window))\
.where(col('order') == 1)
grouped.show()
|Item|Weight| Color|count|order|
+----+------+------+-----+-----+
| B| 6|Orange| 1| 1|
| B| 5| Green| 2| 1|
| A| 3| Red| 2| 1|
+----+------+------+-----+-----+
然后我们根据条件“weight”应用连接:
left_join = grouped.alias('tr').join(items, (grouped.Weight == items.Weight),how='left')
df_edited = left_join.select("tr.Item","tr.Weight","tr.Color","tr.count","tr.order","Price")
df_edited.show()
+----+------+------+-----+-----+-----+
|Item|Weight| Color|count|order|Price|
+----+------+------+-----+-----+-----+
| B| 6|Orange| 1| 1| 20|
| B| 6|Orange| 1| 1| 10|
| B| 6|Orange| 1| 1| 20|
| B| 5| Green| 2| 1| 30|
| B| 5| Green| 2| 1| 10|
| B| 5| Green| 2| 1| 30|
| A| 3| Red| 2| 1| 10|
| A| 3| Red| 2| 1| 10|
| A| 3| Red| 2| 1| 20|
| A| 3| Red| 2| 1| 10|
+----+------+------+-----+-----+-----+
你几乎擅长你所尝试的。这是一个有效的(修复了连接的最后一步):
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy('Weight', 'Color')
grouped = items.filter("Item is not null") \
.withColumn('count', F.count("*").over(w)) \
.withColumn('rn', F.row_number().over(w.orderBy(F.desc("count")))) \
.where("rn = 1") \
.select('Item', 'Weight', 'Color')
items.alias("item").join(grouped.alias("mode"), ['Weight', 'Color'], "left") \
.select("item.*", F.coalesce("item.Item", "mode.Item").alias("item_weight_impute")) \
.show()
#+------+------+----+-----+------------------+
#|Weight| Color|Item|Price|item_weight_impute|
#+------+------+----+-----+------------------+
#| 6|Orange| B| 20| B|
#| 3| Red|null| 10| A|
#| 3| Red| A| 10| A|
#| 3| Red| A| 20| A|
#| 3| Red|null| 10| A|
#| 6| Green|null| 10| null|
#| 5| Green| A| 30| A|
#| 5| Green| B| 10| B|
#| 5| Green| B| 30| B|
#| 6| Red|null| 20| null|
#+------+------+----+-----+------------------+
一旦你 grouped,你可以对 items 和 grouped 进行左连接, 然后使用 coalesce 在 Item 列中填充空值。合并函数 returns 第一列不为空。
步骤 1. 分组 df:
df1 = (items
.dropna()
.groupBy("Item", "Weight", "Color")
.agg(F.count("Price").alias("count")))
df2 = (df1
.groupBy("Weight", "Color")
.agg(F.max("count").alias("count")))
grouped = (df1
.join(df2, ["Weight", "Color", "count"])
.withColumnRenamed("Item", "fill_value")
.drop("count"))
第 2 步。左连接项目与分组
df = items.join(grouped, ["Weight", "Color"], "left" )
第 3 步。应用合并
df_filled = df.withColumn("item_weight_impute", F.coalesce("Item", "fill_value"))
df_filled 现在是