从 pyspark 中的数组列中创建中位数和平均值列
create median and average column out of array column in pyspark
我有以下 spark 数据框:
import pandas as pd
foo = pd.DataFrame({'id': [1,2,3,4], 'col': [[1,2,None, None], [1,1,2,2], [1,1,1,2], [1,None,None,None]]})
foo_dfs = spark.createDataFrame(foo)
foo_dfs.show()
+---+------------+
| id| col|
+---+------------+
| 1| [1, 2,,]|
| 2|[1, 1, 2, 2]|
| 3|[1, 1, 1, 2]|
| 4| [1,,,]|
+---+------------+
我正在通过执行以下操作创建 min_col
和 max_col
:
from pyspark.sql import functions as f
foo_dfs = foo_dfs.withColumn('min_col', f.array_min('col'))
foo_dfs = foo_dfs.withColumn('max_col', f.array_max('col'))
输出:
+---+------------+-------+-------+
| id| col|min_col|max_col|
+---+------------+-------+-------+
| 1| [1, 2,,]| 1| 2|
| 2|[1, 1, 2, 2]| 1| 2|
| 3|[1, 1, 1, 2]| 1| 2|
| 4| [1,,,]| 1| 1|
+---+------------+-------+-------+
我还想在 max_col
旁边添加 mean_col
和 median_col
。我怎样才能做到这一点 ?据我所知,没有 f.array_mean
或 f.array_median
函数
输出数据框应如下所示:
+---+------------+-------+-------+--------+----------+
| id| col|min_col|max_col|mean_col|median_col|
+---+------------+-------+-------+--------+----------+
| 1| [1, 2,,]| 1| 2| 1.5| 1.5|
| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
| 4| [1,,,]| 1| 1| 1.0| 1.0|
+---+------------+-------+-------+--------+----------+
所以mean_col
和median_col
计算应该忽略None
值
我试过这个:
import numpy as np
array_median = f.udf(lambda x: float(np.nanmedian(x)), FloatType())
array_mean = f.udf(lambda x: float(np.nanmean(x)), FloatType())
foo_dfs.withColumn('median_col', array_median('col'))
foo_dfs.withColumn('mean_col', array_mean('col'))
但是它没有用,我收到以下错误:
TypeError: ufunc 'isnan' not supported for the input types, and the
inputs could not be safely coerced to any supported types according to
the casting rule ''safe''
有什么想法吗?
对 mean_col
使用 aggregate
函数,when
与 array_sort
一起得到 median_col
。但首先,您需要使用 filter
函数从数组中过滤空值:
from pyspark.sql import functions as F
foo_dfs = (foo_dfs.withColumn('col', F.array_sort(F.expr('filter(col, x -> x is not null)')))
.withColumn('size', F.size('col'))
.withColumn('min_col', F.array_min('col'))
.withColumn('max_col', F.array_max('col'))
.withColumn('mean_col', F.expr('aggregate(col, 0L, (acc,x) -> acc+x, acc -> acc /size)'))
.withColumn('median_col', F.when(F.col('size') % 2 == 0,
(F.expr('col[int(size/2)]') + F.expr('col[int(size/2)-1]'))/2
).otherwise(F.expr('col[int(size/2)]'))
)
).drop("size")
foo_dfs.show()
#+---+------------+-------+-------+--------+----------+
#| id| col|min_col|max_col|mean_col|median_col|
#+---+------------+-------+-------+--------+----------+
#| 1| [1, 2]| 1| 2| 1.5| 1.5|
#| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
#| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
#| 4| [1]| 1| 1| 1.0| 1.0|
#+---+------------+-------+-------+--------+----------+
一些解释:
mean_col
:聚合函数对数组的所有元素求和,然后应用一个 finish lambda 函数,将所得总和除以数组的大小。
median_col
:对数组进行排序并检查其大小:如果 size%2 = 0
则将索引 size/2
和 size/2 -1
处的元素相加并除以 2。否则 (size%2 != 0
) 中位数对应索引size/2
. 的元素
我有以下 spark 数据框:
import pandas as pd
foo = pd.DataFrame({'id': [1,2,3,4], 'col': [[1,2,None, None], [1,1,2,2], [1,1,1,2], [1,None,None,None]]})
foo_dfs = spark.createDataFrame(foo)
foo_dfs.show()
+---+------------+
| id| col|
+---+------------+
| 1| [1, 2,,]|
| 2|[1, 1, 2, 2]|
| 3|[1, 1, 1, 2]|
| 4| [1,,,]|
+---+------------+
我正在通过执行以下操作创建 min_col
和 max_col
:
from pyspark.sql import functions as f
foo_dfs = foo_dfs.withColumn('min_col', f.array_min('col'))
foo_dfs = foo_dfs.withColumn('max_col', f.array_max('col'))
输出:
+---+------------+-------+-------+
| id| col|min_col|max_col|
+---+------------+-------+-------+
| 1| [1, 2,,]| 1| 2|
| 2|[1, 1, 2, 2]| 1| 2|
| 3|[1, 1, 1, 2]| 1| 2|
| 4| [1,,,]| 1| 1|
+---+------------+-------+-------+
我还想在 max_col
旁边添加 mean_col
和 median_col
。我怎样才能做到这一点 ?据我所知,没有 f.array_mean
或 f.array_median
函数
输出数据框应如下所示:
+---+------------+-------+-------+--------+----------+
| id| col|min_col|max_col|mean_col|median_col|
+---+------------+-------+-------+--------+----------+
| 1| [1, 2,,]| 1| 2| 1.5| 1.5|
| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
| 4| [1,,,]| 1| 1| 1.0| 1.0|
+---+------------+-------+-------+--------+----------+
所以mean_col
和median_col
计算应该忽略None
值
我试过这个:
import numpy as np
array_median = f.udf(lambda x: float(np.nanmedian(x)), FloatType())
array_mean = f.udf(lambda x: float(np.nanmean(x)), FloatType())
foo_dfs.withColumn('median_col', array_median('col'))
foo_dfs.withColumn('mean_col', array_mean('col'))
但是它没有用,我收到以下错误:
TypeError: ufunc 'isnan' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''
有什么想法吗?
对 mean_col
使用 aggregate
函数,when
与 array_sort
一起得到 median_col
。但首先,您需要使用 filter
函数从数组中过滤空值:
from pyspark.sql import functions as F
foo_dfs = (foo_dfs.withColumn('col', F.array_sort(F.expr('filter(col, x -> x is not null)')))
.withColumn('size', F.size('col'))
.withColumn('min_col', F.array_min('col'))
.withColumn('max_col', F.array_max('col'))
.withColumn('mean_col', F.expr('aggregate(col, 0L, (acc,x) -> acc+x, acc -> acc /size)'))
.withColumn('median_col', F.when(F.col('size') % 2 == 0,
(F.expr('col[int(size/2)]') + F.expr('col[int(size/2)-1]'))/2
).otherwise(F.expr('col[int(size/2)]'))
)
).drop("size")
foo_dfs.show()
#+---+------------+-------+-------+--------+----------+
#| id| col|min_col|max_col|mean_col|median_col|
#+---+------------+-------+-------+--------+----------+
#| 1| [1, 2]| 1| 2| 1.5| 1.5|
#| 2|[1, 1, 2, 2]| 1| 2| 1.5| 1.5|
#| 3|[1, 1, 1, 2]| 1| 2| 1.25| 1.0|
#| 4| [1]| 1| 1| 1.0| 1.0|
#+---+------------+-------+-------+--------+----------+
一些解释:
mean_col
:聚合函数对数组的所有元素求和,然后应用一个 finish lambda 函数,将所得总和除以数组的大小。median_col
:对数组进行排序并检查其大小:如果size%2 = 0
则将索引size/2
和size/2 -1
处的元素相加并除以 2。否则 (size%2 != 0
) 中位数对应索引size/2
. 的元素