如何将具有常量 DenseVector 的新列添加到 pyspark 数据框?
How to add a new column with a constant DenseVector to a pyspark dataframe?
我想向包含常量 DenseVector
的 pyspark 数据框添加一个新列。
以下是我的尝试但失败了:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [(1,2),(3,4),(5,6),(7,8)]
df = spark.createDataFrame(data=data)
@udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return val
df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()
它失败了:
TypeError Traceback (most recent call last)
/tmp/ipykernel_3894138/803146743.py in <module>
----> 1 df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in wrapper(*args)
197 @functools.wraps(self.func, assigned=assignments)
198 def wrapper(*args):
--> 199 return self(*args)
200
201 wrapper.__name__ = self._name
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in __call__(self, *cols)
177 judf = self._judf
178 sc = SparkContext._active_spark_context
--> 179 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
180
181 # This function is for improving the online help system in the interactive interpreter.
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_seq(sc, cols, converter)
59 """
60 if converter:
---> 61 cols = [converter(c) for c in cols]
62 return sc._jvm.PythonUtils.toSeq(cols)
63
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in <listcomp>(.0)
59 """
60 if converter:
---> 61 cols = [converter(c) for c in cols]
62 return sc._jvm.PythonUtils.toSeq(cols)
63
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_java_column(col)
43 jcol = _create_column_from_name(col)
44 else:
---> 45 raise TypeError(
46 "Invalid argument, not a string or column: "
47 "{0} of type {1}. "
TypeError: Invalid argument, not a string or column: [1.0,0.0] of type <class 'pyspark.ml.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
你能帮我理解为什么会失败吗?
你可以试试
add_cons_dense_col = F.udf(lambda: DenseVector([1., 0.]), VectorUDT())
df = df.withColumn('ttt', add_cons_dense_col())
df.show(truncate=False)
当您调用 UDF 而不是 DenseVector
时,您需要传递 ArrayType
类型列。而且你还需要把add_cons_dense_col
函数的return改成DenseVector
:
import pyspark.sql.functions as F
@F.udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return DenseVector(val)
df.withColumn('ttt', add_cons_dense_col(F.array(F.lit(1.), F.lit(1.)))).show()
#+---+---+---------+
#| _1| _2| ttt|
#+---+---+---------+
#| 1| 2|[1.0,0.0]|
#| 3| 4|[1.0,0.0]|
#| 5| 6|[1.0,0.0]|
#| 7| 8|[1.0,0.0]|
#+---+---+---------+
从 python 列表创建数组列:
F.array(*[F.lit(x) for x in [1., 0., 3., 5.]])
我想向包含常量 DenseVector
的 pyspark 数据框添加一个新列。
以下是我的尝试但失败了:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [(1,2),(3,4),(5,6),(7,8)]
df = spark.createDataFrame(data=data)
@udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return val
df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()
它失败了:
TypeError Traceback (most recent call last)
/tmp/ipykernel_3894138/803146743.py in <module>
----> 1 df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in wrapper(*args)
197 @functools.wraps(self.func, assigned=assignments)
198 def wrapper(*args):
--> 199 return self(*args)
200
201 wrapper.__name__ = self._name
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in __call__(self, *cols)
177 judf = self._judf
178 sc = SparkContext._active_spark_context
--> 179 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
180
181 # This function is for improving the online help system in the interactive interpreter.
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_seq(sc, cols, converter)
59 """
60 if converter:
---> 61 cols = [converter(c) for c in cols]
62 return sc._jvm.PythonUtils.toSeq(cols)
63
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in <listcomp>(.0)
59 """
60 if converter:
---> 61 cols = [converter(c) for c in cols]
62 return sc._jvm.PythonUtils.toSeq(cols)
63
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_java_column(col)
43 jcol = _create_column_from_name(col)
44 else:
---> 45 raise TypeError(
46 "Invalid argument, not a string or column: "
47 "{0} of type {1}. "
TypeError: Invalid argument, not a string or column: [1.0,0.0] of type <class 'pyspark.ml.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
你能帮我理解为什么会失败吗?
你可以试试
add_cons_dense_col = F.udf(lambda: DenseVector([1., 0.]), VectorUDT())
df = df.withColumn('ttt', add_cons_dense_col())
df.show(truncate=False)
当您调用 UDF 而不是 DenseVector
时,您需要传递 ArrayType
类型列。而且你还需要把add_cons_dense_col
函数的return改成DenseVector
:
import pyspark.sql.functions as F
@F.udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return DenseVector(val)
df.withColumn('ttt', add_cons_dense_col(F.array(F.lit(1.), F.lit(1.)))).show()
#+---+---+---------+
#| _1| _2| ttt|
#+---+---+---------+
#| 1| 2|[1.0,0.0]|
#| 3| 4|[1.0,0.0]|
#| 5| 6|[1.0,0.0]|
#| 7| 8|[1.0,0.0]|
#+---+---+---------+
从 python 列表创建数组列:
F.array(*[F.lit(x) for x in [1., 0., 3., 5.]])