如何访问 Spark DataFrame 中 VectorUDT 列的元素?
How to access element of a VectorUDT column in a Spark DataFrame?
我有一个数据框 df
,其中有一个名为 features
的 VectorUDT
列。我如何获得列的元素,比如第一个元素?
我已尝试执行以下操作
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
但我收到 net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)
错误。如果我改为 first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])
,则会出现同样的错误。
我也试过 explode()
但我收到一个错误,因为它需要数组或映射类型。
我想这应该是一个常见的操作。
将输出转换为 float
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
用法示例:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.select(ith("features", lit(1))).show()
## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## | 2.0|
## | 9.0|
## +-----------------+
解释:
必须将输出值重新序列化为等效的 Java 对象。如果你想访问 values
(注意 SparseVectors
),你应该使用 item
方法:
v.values.item(0)
哪个 return 标准 Python 标量。同样,如果你想访问所有值作为密集结构:
v.toArray().tolist()
如果您更喜欢使用 spark.sql,您可以使用以下自定义函数 'to_array' 将向量转换为数组。然后你可以把它当作一个数组来操作。
from pyspark.sql.types import ArrayType, DoubleType
def to_array_(v):
return v.toArray().tolist()
from pyspark.sql import SQLContext
sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None)
sqlContext.udf.register("to_array",to_array_, ArrayType(DoubleType()))
例子
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.createOrReplaceTempView("tb")
spark.sql("""select * , to_array(features)[1] Second from tb """).toPandas()
输出
id features Second
0 1 [1.0, 2.0, 3.0] 2.0
1 2 (0.0, 9.0, 0.0) 9.0
我 运行 遇到同样的问题,无法使用 explode()。您可以做的一件事是使用 pyspark.ml.feature 库中的 VectorSlice。像这样:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
slicer = VectorSlicer(inputCol="features", outputCol="features_one", indices=[0])
output = slicer.transform(df)
output.select("features", "features_one").show()
对于任何试图将 PySpark ML 模型训练后生成的概率列拆分为可用列的人。这不使用 UDF 或 numpy。这只适用于二进制分类。这里 lr_pred 是具有逻辑回归模型预测的数据框。
prob_df1=lr_pred.withColumn("probability",lr_pred["probability"].cast("String"))
prob_df =prob_df1.withColumn('probabilityre',split(regexp_replace("probability", "^\[|\]", ""), ",")[1].cast(DoubleType()))
我有一个数据框 df
,其中有一个名为 features
的 VectorUDT
列。我如何获得列的元素,比如第一个元素?
我已尝试执行以下操作
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
但我收到 net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)
错误。如果我改为 first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])
,则会出现同样的错误。
我也试过 explode()
但我收到一个错误,因为它需要数组或映射类型。
我想这应该是一个常见的操作。
将输出转换为 float
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
用法示例:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.select(ith("features", lit(1))).show()
## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## | 2.0|
## | 9.0|
## +-----------------+
解释:
必须将输出值重新序列化为等效的 Java 对象。如果你想访问 values
(注意 SparseVectors
),你应该使用 item
方法:
v.values.item(0)
哪个 return 标准 Python 标量。同样,如果你想访问所有值作为密集结构:
v.toArray().tolist()
如果您更喜欢使用 spark.sql,您可以使用以下自定义函数 'to_array' 将向量转换为数组。然后你可以把它当作一个数组来操作。
from pyspark.sql.types import ArrayType, DoubleType
def to_array_(v):
return v.toArray().tolist()
from pyspark.sql import SQLContext
sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None)
sqlContext.udf.register("to_array",to_array_, ArrayType(DoubleType()))
例子
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.createOrReplaceTempView("tb")
spark.sql("""select * , to_array(features)[1] Second from tb """).toPandas()
输出
id features Second
0 1 [1.0, 2.0, 3.0] 2.0
1 2 (0.0, 9.0, 0.0) 9.0
我 运行 遇到同样的问题,无法使用 explode()。您可以做的一件事是使用 pyspark.ml.feature 库中的 VectorSlice。像这样:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
slicer = VectorSlicer(inputCol="features", outputCol="features_one", indices=[0])
output = slicer.transform(df)
output.select("features", "features_one").show()
对于任何试图将 PySpark ML 模型训练后生成的概率列拆分为可用列的人。这不使用 UDF 或 numpy。这只适用于二进制分类。这里 lr_pred 是具有逻辑回归模型预测的数据框。
prob_df1=lr_pred.withColumn("probability",lr_pred["probability"].cast("String"))
prob_df =prob_df1.withColumn('probabilityre',split(regexp_replace("probability", "^\[|\]", ""), ",")[1].cast(DoubleType()))