用户定义函数到 return 系列
User define function to return series
我正在尝试将 python 函数转换为 UDF,以便在 spark 数据帧上使用。该函数将列名与列值连接起来,然后按行连接结果字符串。
例如
import pyspark.sql.functions as F
dd = pd.DataFrame({'AV':['dfgt', 'dsfafgs'], 'AC':['dhghd', 'erytsh']})
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
get_vector(dd)
#0 AC:dhghd/AV:dfgt
#1 AC:erytsh/AV:dsfafgs
为了转换成 UDF,我试过了,
dd_spark = spark.createDataFrame(dd)
@F.pandas_udf(dd_spark.schema, F.PandasUDFType.SCALAR)
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
出现以下错误:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 765, in run
self.__target(*self.__args, **self.__kwargs)
File "<stdin>", line 37, in get_data
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 64, in _create_udf
return udf_obj._wrapped()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 186, in _wrapped
wrapper.returnType = self.returnType
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 119, in returnType
"not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(AC,StringType,true),StructField(AV,StringType,true),StructField(UI,StringType,true),StructField(C,StringType,true),StructField(I,StringType,true),StructField(A,StringType,true))) is not supported
pandas_udf的第一个参数应该是return类型,而不是输入数据的类型。
用T.StringType()
替换dd_spark.schema
解决了问题:
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
dd_spark.withColumn("vector", get_vector(
F.struct([dd_spark[x] for x in dd_spark.columns]))).show(truncate=False)
打印
+-------+------+--------------------+
|AV |AC |vector |
+-------+------+--------------------+
|dfgt |dhghd |AV:dfgt/AC:dhghd |
|dsfafgs|erytsh|AV:dsfafgs/AC:erytsh|
+-------+------+--------------------+
我正在尝试将 python 函数转换为 UDF,以便在 spark 数据帧上使用。该函数将列名与列值连接起来,然后按行连接结果字符串。 例如
import pyspark.sql.functions as F
dd = pd.DataFrame({'AV':['dfgt', 'dsfafgs'], 'AC':['dhghd', 'erytsh']})
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
get_vector(dd)
#0 AC:dhghd/AV:dfgt
#1 AC:erytsh/AV:dsfafgs
为了转换成 UDF,我试过了,
dd_spark = spark.createDataFrame(dd)
@F.pandas_udf(dd_spark.schema, F.PandasUDFType.SCALAR)
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
出现以下错误:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 765, in run
self.__target(*self.__args, **self.__kwargs)
File "<stdin>", line 37, in get_data
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 64, in _create_udf
return udf_obj._wrapped()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 186, in _wrapped
wrapper.returnType = self.returnType
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/udf.py", line 119, in returnType
"not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(AC,StringType,true),StructField(AV,StringType,true),StructField(UI,StringType,true),StructField(C,StringType,true),StructField(I,StringType,true),StructField(A,StringType,true))) is not supported
pandas_udf的第一个参数应该是return类型,而不是输入数据的类型。
用T.StringType()
替换dd_spark.schema
解决了问题:
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
def get_vector(dataframe):
d2 = dataframe.astype(str).radd(dataframe.columns + ':')
res = pd.Series(d2.fillna('').values.tolist()).str.join('/')
return res
dd_spark.withColumn("vector", get_vector(
F.struct([dd_spark[x] for x in dd_spark.columns]))).show(truncate=False)
打印
+-------+------+--------------------+
|AV |AC |vector |
+-------+------+--------------------+
|dfgt |dhghd |AV:dfgt/AC:dhghd |
|dsfafgs|erytsh|AV:dsfafgs/AC:erytsh|
+-------+------+--------------------+