使用 Spark pandas_udf 创建具有动态输入列数的列

Create column using Spark pandas_udf, with dynamic number of input columns

我有这个 df:

df = spark.createDataFrame(
    [('row_a', 5.0, 0.0, 11.0),
     ('row_b', 3394.0, 0.0, 4543.0),
     ('row_c', 136111.0, 0.0, 219255.0),
     ('row_d', 0.0, 0.0, 0.0),
     ('row_e', 0.0, 0.0, 0.0),
     ('row_f', 42.0, 0.0, 54.0)],
    ['value', 'col_a', 'col_b', 'col_c']
)

我想使用 Pandas 中的 .quantile(0.25, axis=1) 来添加一列:

import pandas as pd
pdf = df.toPandas()
pdf['25%'] = pdf.quantile(0.25, axis=1)
print(pdf)
#    value     col_a  col_b     col_c      25%
# 0  row_a       5.0    0.0      11.0      2.5
# 1  row_b    3394.0    0.0    4543.0   1697.0
# 2  row_c  136111.0    0.0  219255.0  68055.5
# 3  row_d       0.0    0.0       0.0      0.0
# 4  row_e       0.0    0.0       0.0      0.0
# 5  row_f      42.0    0.0      54.0     21.0

性能对我来说很重要,所以我认为 pyspark.sql.functions 中的 pandas_udf 可以以更优化的方式做到这一点。但我很难做出一个高性能和有用的功能。这是我最好的尝试:

from pyspark.sql import functions as F
import pandas as pd
@F.pandas_udf('double')
def quartile1_on_axis1(a: pd.Series, b: pd.Series, c: pd.Series) -> pd.Series:
    pdf = pd.DataFrame({'a':a, 'b':b, 'c':c})
    return pdf.quantile(0.25, axis=1)

df = df.withColumn('25%', quartile1_on_axis1('col_a', 'col_b', 'col_c'))
  1. 我不喜欢我需要为每一列提供一个参数,然后在函数中分别处理这些参数以创建一个 df。所有这些列都有相同的目的,所以恕我直言,应该有一种方法可以将它们一起处理,就像在这个伪代码中一样:

    def quartile1_on_axis1(*cols) -> pd.Series:
        pdf = pd.DataFrame(cols)
    

    这样我就可以将此函数用于任意数量的列。

  2. 是否需要在UDF里面创建一个pd.Dataframe?对我来说,这似乎与没有 UDF (Spark df -> Pandas df -> Spark df) 相同,如上所示。如果没有 UDF,它甚至更短。我真的应该尝试让它在性能方面与 pandas_udf 一起工作吗?我认为 pandas_udf 就是专门为这种目的而设计的...

我会使用 GroupedData。因为这需要您传递 df 的模式,所以添加一个具有所需数据类型的列并获取模式。需要时传递该模式。下面的代码;

#Generate new schema by adding new column

sch =df.withColumn('quantile25',lit(110.5)).schema

#udf
def quartile1_on_axis1(pdf):
  
  pdf =pdf.assign(quantile25=pdf.quantile(0.25, axis=1))
 
  return pdf


 #apply udf 


df.groupby('value').applyInPandas(quartile1_on_axis1, schema=sch).show()


#outcome
+-----+--------+-----+--------+----------+
|value|   col_a|col_b|   col_c|quantile25|
+-----+--------+-----+--------+----------+
|row_a|     5.0|  0.0|    11.0|       2.5|
|row_b|  3394.0|  0.0|  4543.0|    1697.0|
|row_c|136111.0|  0.0|219255.0|   68055.5|
|row_d|     0.0|  0.0|     0.0|       0.0|
|row_e|     0.0|  0.0|     0.0|       0.0|
|row_f|    42.0|  0.0|    54.0|      21.0|
+-----+--------+-----+--------+----------+

您也可以在 udf 中使用 numpy 来完成此操作。如果您不想列出所有列,请按索引对它们(列)进行切片。

quartile1_on_axis1=udf(lambda x: float(np.quantile(x, 0.25)),FloatType())

df.withColumn("0.25%", quartile1_on_axis1(array(df.columns[1:]))).show(truncate=False)

+-----+--------+-----+--------+-------+
|value|col_a   |col_b|col_c   |0.25%  |
+-----+--------+-----+--------+-------+
|row_a|5.0     |0.0  |11.0    |2.5    |
|row_b|3394.0  |0.0  |4543.0  |1697.0 |
|row_c|136111.0|0.0  |219255.0|68055.5|
|row_d|0.0     |0.0  |0.0     |0.0    |
|row_e|0.0     |0.0  |0.0     |0.0    |
|row_f|42.0    |0.0  |54.0    |21.0   |
+-----+--------+-----+--------+-------+

以下内容似乎满足了要求,但它使用的是常规 udf 而不是 pandas_udf。如果我能以类似的方式使用 pandas_udf 就好了。

from pyspark.sql import functions as F
import numpy as np

@F.udf('double')
def lower_quart(*cols):
    return float(np.quantile(cols, 0.25))
df = df.withColumn('25%', lower_quart('col_a', 'col_b', 'col_c'))

df.show()
#+-----+--------+-----+--------+-------+
#|value|   col_a|col_b|   col_c|    25%|
#+-----+--------+-----+--------+-------+
#|row_a|     5.0|  0.0|    11.0|    2.5|
#|row_b|  3394.0|  0.0|  4543.0| 1697.0|
#|row_c|136111.0|  0.0|219255.0|68055.5|
#|row_d|     0.0|  0.0|     0.0|    0.0|
#|row_e|     0.0|  0.0|     0.0|    0.0|
#|row_f|    42.0|  0.0|    54.0|   21.0|
#+-----+--------+-----+--------+-------+

您可以传递单个结构列,而不是像这样使用多个列:

@F.pandas_udf('double')
def quartile1_on_axis1(s: pd.DataFrame) -> pd.Series:
    return s.quantile(0.25, axis=1)


cols = ['col_a', 'col_b', 'col_c']

df = df.withColumn('25%', quartile1_on_axis1(F.struct(*cols)))
df.show()

# +-----+--------+-----+--------+-------+
# |value|   col_a|col_b|   col_c|    25%|
# +-----+--------+-----+--------+-------+
# |row_a|     5.0|  0.0|    11.0|    2.5|
# |row_b|  3394.0|  0.0|  4543.0| 1697.0|
# |row_c|136111.0|  0.0|219255.0|68055.5|
# |row_d|     0.0|  0.0|     0.0|    0.0|
# |row_e|     0.0|  0.0|     0.0|    0.0|
# |row_f|    42.0|  0.0|    54.0|   21.0|
# +-----+--------+-----+--------+-------+

pyspark.sql.functions.pandas_udf

Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of pyspark.sql.types.StructType.

udf 方法将为您提供所需的结果,而且绝对是最直接的。但是,如果性能确实是重中之重,您可以为 quantile 创建自己的原生 Spark 实现。基础知识可以很容易地编码,如果你想使用任何其他 pandas 参数,你需要自己调整它。

注意: 这是从 pandas API docs for interpolation='linear' 中提取的。如果您打算使用它,请测试性能并在大型数据集上自行验证结果。

import math
from pyspark.sql import functions as f

def quantile(q, cols):
    if q < 0 or q > 1:
        raise ValueError("Parameter q should be 0 <= q <= 1")

    if not cols:
        raise ValueError("List of columns should be provided")

    idx = (len(cols) - 1) * q
    i = math.floor(idx)
    j = math.ceil(idx)
    fraction = idx - i

    arr = f.array_sort(f.array(*cols))

    return arr.getItem(i) + (arr.getItem(j) - arr.getItem(i)) * fraction


columns = ['col_a', 'col_b', 'col_c']

df.withColumn('0.25%', quantile(0.25, columns)).show()

+-----+--------+-----+--------+-----+-------+
|value|   col_a|col_b|   col_c|col_d|  0.25%|
+-----+--------+-----+--------+-----+-------+
|row_a|     5.0|  0.0|    11.0|    1|    2.5|
|row_b|  3394.0|  0.0|  4543.0|    1| 1697.0|
|row_c|136111.0|  0.0|219255.0|    1|68055.5|
|row_d|     0.0|  0.0|     0.0|    1|    0.0|
|row_e|     0.0|  0.0|     0.0|    1|    0.0|
|row_f|    42.0|  0.0|    54.0|    1|   21.0|
+-----+--------+-----+--------+-----+-------+

作为旁注,还有 pandas API on spark,但是 axis=1 尚未(尚未)实施。将来可能会添加。

df.to_pandas_on_spark().quantile(0.25, axis=1)

NotImplementedError: axis should be either 0 or "index" currently.