Pyspark - 重新分配百分比

Pyspark - redistribute percentages

我有一个 table 如下所示:

city | center | qty_out | qty_out %
----------------------------------------
 A   | 1      |  10     | .286
 A   | 2      |  2      | .057
 A   | 3      |  23     | .657
 B   | 1      |  40     | .8
 B   | 2      |  10     | .2

city-center 是 unique/the 主键。

如果城市中的任何中心的 qty_out % 小于 10% (.10),我想忽略它并在城市的其他中心重新分配它的百分比。所以上面的结果会变成

city | center | qty_out_%
----------------------------------------
 A   | 1      | .3145
 A   | 3      | .6855
 B   | 1      | .8
 B   | 2      | .2

我该怎么做?我在想一个 window 函数来分区,但想不出一个 window 函数与这个

一起使用
column_list = ["city","center"]
w = Window.partitionBy([col(x) for x in column_list]).orderBy('qty_out_%')

我不是统计学家,所以我不能对等式发表评论,但是,如果我按照你提到的字面意思写 Spark SQL,它会是这样的。

w = Window.partitionBy('city')
redist_cond = F.when(F.col('qty_out %') < 0.1, F.col('qty_out %'))
df = (df.withColumn('redist', F.sum(redist_cond).over(w) / (F.count('*').over(w) - F.count(redist_cond).over(w)))
      .fillna(0, subset=['redist'])
      .filter(F.col('qty_out %') >= 0.1)
      .withColumn('qty_out %', redist_cond.otherwise(F.col('qty_out %') + F.col('redist')))
      .drop('redist'))