使用 Quantlib 函数的 Pyspark UDF

Pyspark UDF using Quantlib function

我一直在试验 Quantlib 和 Spark,试图在 Pyspark 中传递 Quantlib 函数,请参见下面的示例:

from QuantLib import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf


df = sc.parallelize([("2016-10-01",),
                     ("2016-11-01",),
                     ("2016-12-01",)]).toDF(['someDate'])

testudf = udf(lambda x: str(DateParser.parseFormatted(x,'%Y-%m-%d')), StringType())

df.withColumn('new', testudf('someDate')).show()

到目前为止我还没有成功,想知道是否有人有更好的运气。

这是我得到的错误:

typeError: in method 'DateParser_parseFormatted', argument 1 of type 'std::string const &'
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:234)

从 C++ 导出,DateParser.parseFormatted 方法对类型有一定的讲究,不能接受 udf 机制传递给 lambda 的 Spark 字符串 x。您必须将 x 转换回 lambda 中的 Python 字符串。我不熟悉 Spark 及其类型,但可能 str(x),如

lambda x: str(DateParser.parseFormatted(str(x), '%Y-%m-%d'))

可以做这份工作吗?

作为旁注,我不确定您的 lambda 中外部 str 的意义何在。您正在获取一个字符串,通过 DateParser 将其转换为 Date 对象,然后再次将结果转换为字符串...