用户定义函数到 return 系列

User define function to return series

我正在尝试将 python 函数转换为 UDF,以便在 spark 数据帧上使用。该函数将列名与列值连接起来,然后按行连接结果字符串。 例如

import pyspark.sql.functions as F

dd = pd.DataFrame({'AV':['dfgt', 'dsfafgs'], 'AC':['dhghd', 'erytsh']})

def get_vector(dataframe):
    d2 = dataframe.astype(str).radd(dataframe.columns + ':')
    res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
    return res

get_vector(dd)

#0        AC:dhghd/AV:dfgt
#1    AC:erytsh/AV:dsfafgs

为了转换成 UDF,我试过了,

dd_spark = spark.createDataFrame(dd)

@F.pandas_udf(dd_spark.schema, F.PandasUDFType.SCALAR)
def get_vector(dataframe):
    d2 = dataframe.astype(str).radd(dataframe.columns + ':')
    res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
    return res

出现以下错误:

Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 765, in run
    self.__target(*self.__args, **self.__kwargs)
  File "<stdin>", line 37, in get_data
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 64, in _create_udf
    return udf_obj._wrapped()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 186, in _wrapped
    wrapper.returnType = self.returnType
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 119, in returnType
    "not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(AC,StringType,true),StructField(AV,StringType,true),StructField(UI,StringType,true),StructField(C,StringType,true),StructField(I,StringType,true),StructField(A,StringType,true))) is not supported

pandas_udf的第一个参数应该是return类型,而不是输入数据的类型。

T.StringType()替换dd_spark.schema解决了问题:

from pyspark.sql import functions as F
from pyspark.sql import types as T

@F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
def get_vector(dataframe):
    d2 = dataframe.astype(str).radd(dataframe.columns + ':')
    res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
    return res

dd_spark.withColumn("vector", get_vector(
    F.struct([dd_spark[x] for x in dd_spark.columns]))).show(truncate=False)

打印

+-------+------+--------------------+                                           
|AV     |AC    |vector              |
+-------+------+--------------------+
|dfgt   |dhghd |AV:dfgt/AC:dhghd    |
|dsfafgs|erytsh|AV:dsfafgs/AC:erytsh|
+-------+------+--------------------+