PySpark - 迭代和有条件地计算中值,填充 NA

PySpark- iteratively and conditionally compute median, fill NAs

我有一个 PySpark 数据框:

values = [('Lacoste', 'Red', 6, 30), ('Gap', 'Orange', 8, None), ('Lacoste', 'Green', 5, 200),
         ('Gap', 'Red', 3, None), ('Gap', 'Orange', 5, None), ('Lacoste', 'Green', 3, 150),
         ('Lacoste', 'Orange', 9, 40), ('Lacoste', 'Red', 4, 70), ('Gap', 'Green', None, 15),
         ('Lacoste', 'Red', None, 50), ('Gap', 'Orange', 5, 17), ('Lacoste', 'Green', None, 40),
         ('Banana Republic', 'Orange', None, None)]
ratings = spark.createDataFrame(values, ['Brand', 'Color', 'Rating', 'Price'])
ratings.show()

#+---------------+------+------+-----+
#|          Brand| Color|Rating|Price|
#+---------------+------+------+-----+
#|        Lacoste|   Red|     6|   30|
#|            Gap|Orange|     8| null|
#|        Lacoste| Green|     5|  200|
#|            Gap|   Red|     3| null|
#|            Gap|Orange|     5| null|
#|        Lacoste| Green|     3|  150|
#|        Lacoste|Orange|     9|   40|
#|        Lacoste|   Red|     4|   70|
#|            Gap| Green|  null|   15|
#|        Lacoste|   Red|  null|   50|
#|            Gap|Orange|     5|   17|
#|        Lacoste| Green|  null|   40|
#|Banana Republic|Orange|  null| null|
#+---------------+------+------+-----+

已编辑: 我想用基于品牌和颜色的中位数填充所有 NA,然后仅基于品牌 - 结果将是唯一具有剩余空值的行将是 Banana Republic 行(因为没有品牌或 brand/color 香蕉共和国的组合)。第一个答案几乎让我明白了,但正如你所看到的,我错误地硬编码了一个列名——我希望它遍历列名列表。

# Assign median based on the brand and color combination
median_columns = ['Rating','Price']
median_columns = ['Rating','Price']
for item in median_columns:
    brand_window = Window.partitionBy('Brand')
    brand_color_window = Window.partitionBy('Brand','Color')
    brand_color_median = f.expr("percentile_approx('item', 0.5)")
    ratings = ratings.withColumn(item, 
                      f.coalesce(item,
                                 brand_color_median.over(brand_color_window),
                                 brand_color_median.over(brand_window)))

ratings.show()

#+---------------+------+------+-----+
#|          Brand| Color|Rating|Price|
#+---------------+------+------+-----+
#|            Gap| Green|  null| 15.0|
#|            Gap|Orange|   5.0| null|
#|            Gap|Orange|   5.0| 17.0|
#|            Gap|Orange|   8.0| null|
#|            Gap|   Red|   3.0| null|
#|        Lacoste| Green|   5.0|200.0|
#|        Lacoste| Green|  null| 40.0|
#|        Lacoste| Green|   3.0|150.0|
#|        Lacoste|Orange|   9.0| 40.0|
#|        Lacoste|   Red|  null| 50.0|
#|        Lacoste|   Red|   6.0| 30.0|
#|        Lacoste|   Red|   4.0| 70.0|
#|Banana Republic|Orange|  null| null|
#+---------------+------+------+-----+

空值不会被覆盖 - 我错过了什么?

你需要coalesce原来的列和中间的列,这样不为null的时候就不会覆盖原来的列。

median_columns = ['Rating','Price']
for item in median_columns:
    brand_window = Window.partitionBy('Brand')
    brand_color_window = Window.partitionBy('Brand','Color')
    brand_color_median = F.expr(f'percentile_approx({item}, 0.5)')
    ratings = ratings.withColumn(item, 
                      F.coalesce(item,
                                 brand_color_median.over(brand_color_window),
                                 brand_color_median.over(brand_window)))

ratings.show()
+---------------+------+------+-----+
|          Brand| Color|Rating|Price|
+---------------+------+------+-----+
|            Gap| Green|     5|   15|
|            Gap|Orange|     8|    5|
|            Gap|Orange|     5|    5|
|            Gap|Orange|     5|   17|
|            Gap|   Red|     3|    3|
|        Lacoste| Green|     5|  200|
|        Lacoste| Green|     3|  150|
|        Lacoste| Green|     3|   40|
|        Lacoste|Orange|     9|   40|
|        Lacoste|   Red|     6|   30|
|        Lacoste|   Red|     4|   70|
|        Lacoste|   Red|     4|   50|
|Banana Republic|Orange|  null| null|
+---------------+------+------+-----+