从 pyspark 中的数组列中创建中位数和平均值列

create median and average column out of array column in pyspark

我有以下 spark 数据框:

import pandas as pd
foo = pd.DataFrame({'id': [1,2,3,4], 'col': [[1,2,None, None], [1,1,2,2], [1,1,1,2], [1,None,None,None]]})

foo_dfs = spark.createDataFrame(foo)
foo_dfs.show()
+---+------------+
| id|         col|
+---+------------+
|  1|    [1, 2,,]|
|  2|[1, 1, 2, 2]|
|  3|[1, 1, 1, 2]|
|  4|      [1,,,]|
+---+------------+

我正在通过执行以下操作创建 min_colmax_col

from pyspark.sql import functions as f
foo_dfs = foo_dfs.withColumn('min_col', f.array_min('col'))
foo_dfs = foo_dfs.withColumn('max_col', f.array_max('col'))

输出:

+---+------------+-------+-------+
| id|         col|min_col|max_col|
+---+------------+-------+-------+
|  1|    [1, 2,,]|      1|      2|
|  2|[1, 1, 2, 2]|      1|      2|
|  3|[1, 1, 1, 2]|      1|      2|
|  4|      [1,,,]|      1|      1|
+---+------------+-------+-------+

我还想在 max_col 旁边添加 mean_colmedian_col。我怎样才能做到这一点 ?据我所知,没有 f.array_meanf.array_median 函数

输出数据框应如下所示:

+---+------------+-------+-------+--------+----------+
| id|         col|min_col|max_col|mean_col|median_col|
+---+------------+-------+-------+--------+----------+
|  1|    [1, 2,,]|      1|      2|     1.5|       1.5|
|  2|[1, 1, 2, 2]|      1|      2|     1.5|       1.5|
|  3|[1, 1, 1, 2]|      1|      2|    1.25|       1.0|
|  4|      [1,,,]|      1|      1|     1.0|       1.0|
+---+------------+-------+-------+--------+----------+

所以mean_colmedian_col计算应该忽略None

我试过这个:

 import numpy as np
 array_median = f.udf(lambda x: float(np.nanmedian(x)), FloatType())
 array_mean = f.udf(lambda x: float(np.nanmean(x)), FloatType())
 foo_dfs.withColumn('median_col', array_median('col'))
 foo_dfs.withColumn('mean_col', array_mean('col'))

但是它没有用,我收到以下错误:

TypeError: ufunc 'isnan' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

有什么想法吗?

mean_col 使用 aggregate 函数,whenarray_sort 一起得到 median_col。但首先,您需要使用 filter 函数从数组中过滤空值:

from pyspark.sql import functions as F

foo_dfs = (foo_dfs.withColumn('col', F.array_sort(F.expr('filter(col, x -> x is not null)')))
           .withColumn('size', F.size('col'))
           .withColumn('min_col', F.array_min('col'))
           .withColumn('max_col', F.array_max('col'))
           .withColumn('mean_col', F.expr('aggregate(col, 0L, (acc,x) -> acc+x, acc -> acc /size)'))
           .withColumn('median_col', F.when(F.col('size') % 2 == 0,
                                            (F.expr('col[int(size/2)]') + F.expr('col[int(size/2)-1]'))/2
                                            ).otherwise(F.expr('col[int(size/2)]'))
                       )
           ).drop("size")

foo_dfs.show()
#+---+------------+-------+-------+--------+----------+
#| id|         col|min_col|max_col|mean_col|median_col|
#+---+------------+-------+-------+--------+----------+
#|  1|      [1, 2]|      1|      2|     1.5|       1.5|
#|  2|[1, 1, 2, 2]|      1|      2|     1.5|       1.5|
#|  3|[1, 1, 1, 2]|      1|      2|    1.25|       1.0|
#|  4|         [1]|      1|      1|     1.0|       1.0|
#+---+------------+-------+-------+--------+----------+

一些解释:

  • mean_col:聚合函数对数组的所有元素求和,然后应用一个 finish lambda 函数,将所得总和除以数组的大小。
  • median_col:对数组进行排序并检查其大小:如果 size%2 = 0 则将索引 size/2size/2 -1 处的元素相加并除以 2。否则 (size%2 != 0) 中位数对应索引size/2.
  • 的元素