使用 Quantlib 函数的 Pyspark UDF
Pyspark UDF using Quantlib function
我一直在试验 Quantlib 和 Spark,试图在 Pyspark 中传递 Quantlib 函数,请参见下面的示例:
from QuantLib import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
df = sc.parallelize([("2016-10-01",),
("2016-11-01",),
("2016-12-01",)]).toDF(['someDate'])
testudf = udf(lambda x: str(DateParser.parseFormatted(x,'%Y-%m-%d')), StringType())
df.withColumn('new', testudf('someDate')).show()
到目前为止我还没有成功,想知道是否有人有更好的运气。
这是我得到的错误:
typeError: in method 'DateParser_parseFormatted', argument 1 of type 'std::string const &'
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:234)
从 C++ 导出,DateParser.parseFormatted
方法对类型有一定的讲究,不能接受 udf 机制传递给 lambda 的 Spark 字符串 x
。您必须将 x
转换回 lambda 中的 Python 字符串。我不熟悉 Spark 及其类型,但可能 str(x)
,如
lambda x: str(DateParser.parseFormatted(str(x), '%Y-%m-%d'))
可以做这份工作吗?
作为旁注,我不确定您的 lambda 中外部 str
的意义何在。您正在获取一个字符串,通过 DateParser
将其转换为 Date
对象,然后再次将结果转换为字符串...
我一直在试验 Quantlib 和 Spark,试图在 Pyspark 中传递 Quantlib 函数,请参见下面的示例:
from QuantLib import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
df = sc.parallelize([("2016-10-01",),
("2016-11-01",),
("2016-12-01",)]).toDF(['someDate'])
testudf = udf(lambda x: str(DateParser.parseFormatted(x,'%Y-%m-%d')), StringType())
df.withColumn('new', testudf('someDate')).show()
到目前为止我还没有成功,想知道是否有人有更好的运气。
这是我得到的错误:
typeError: in method 'DateParser_parseFormatted', argument 1 of type 'std::string const &'
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:234)
从 C++ 导出,DateParser.parseFormatted
方法对类型有一定的讲究,不能接受 udf 机制传递给 lambda 的 Spark 字符串 x
。您必须将 x
转换回 lambda 中的 Python 字符串。我不熟悉 Spark 及其类型,但可能 str(x)
,如
lambda x: str(DateParser.parseFormatted(str(x), '%Y-%m-%d'))
可以做这份工作吗?
作为旁注,我不确定您的 lambda 中外部 str
的意义何在。您正在获取一个字符串,通过 DateParser
将其转换为 Date
对象,然后再次将结果转换为字符串...