PySpark DataFrame 中向量列上的 UDF 问题
Issue with UDF on a column of Vectors in PySpark DataFrame
我在 PySpark 中的向量列上使用 UDF 时遇到问题,可以在此处说明:
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
(1, Vectors.dense([2.25, -11.1, 123.2])),
(2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()
vector_udf = udf(lambda vector: sum(vector), DoubleType())
df.withColumn('feature_sums', vector_udf(df.features)).first()
失败并显示以下堆栈跟踪:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
x1 File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469, in <lambda>
func = lambda _, it: map(lambda x: f(*x), it)
File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line 143, in <lambda>
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
查看传递给 UDF 的内容,似乎有些奇怪。传递的参数应该是一个 Vector,但它却传递了一个 Python 元组,如下所示:
(1, None, None, [9.7, 1.0, -3.2])
是否不能在 Vectors 的 DataFrame 列上使用 UDF?
编辑
所以在邮件列表上指出这是一个known issue。将接受@hyim 的回答,因为它确实为密集向量提供了一个临时解决方法。
在 spark-sql 中,向量被视为(类型、大小、索引、值)元组。
您可以在带有 pyspark 的向量上使用 udf。只需修改一些代码即可使用向量类型的值。
vector_udf = udf(lambda vector: sum(vector[3]), DoubleType())
df.withColumn('feature_sums', vector_udf(df.features)).first()
我在 PySpark 中的向量列上使用 UDF 时遇到问题,可以在此处说明:
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
(1, Vectors.dense([2.25, -11.1, 123.2])),
(2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()
vector_udf = udf(lambda vector: sum(vector), DoubleType())
df.withColumn('feature_sums', vector_udf(df.features)).first()
失败并显示以下堆栈跟踪:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
x1 File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469, in <lambda>
func = lambda _, it: map(lambda x: f(*x), it)
File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line 143, in <lambda>
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
查看传递给 UDF 的内容,似乎有些奇怪。传递的参数应该是一个 Vector,但它却传递了一个 Python 元组,如下所示:
(1, None, None, [9.7, 1.0, -3.2])
是否不能在 Vectors 的 DataFrame 列上使用 UDF?
编辑
所以在邮件列表上指出这是一个known issue。将接受@hyim 的回答,因为它确实为密集向量提供了一个临时解决方法。
在 spark-sql 中,向量被视为(类型、大小、索引、值)元组。
您可以在带有 pyspark 的向量上使用 udf。只需修改一些代码即可使用向量类型的值。
vector_udf = udf(lambda vector: sum(vector[3]), DoubleType())
df.withColumn('feature_sums', vector_udf(df.features)).first()