如何将具有 SparseVector 列的 RDD 转换为具有列作为 Vector 的 DataFrame
How do I convert an RDD with a SparseVector Column to a DataFrame with a column as Vector
我有一个 RDD 和一个值元组(String,SparseVector),我想创建一个 DataFrame 使用 RDD。获取 (label:string, features:vector) DataFrame 这是大多数 ml 算法库所需的架构。
我知道这是可以做到的,因为 HashingTF ml 库在给定 DataFrame 的特征列时输出一个向量。
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
StructField("label", DoubleType(), False),
StructField("tokens", ArrayType(StringType()), False)
]))
#assumming there is an RDD (double,array(strings))
hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")
ndf = hashingTF.transform(temp_df)
ndf.printSchema()
#outputs
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#| |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
所以我的问题是,我能否以某种方式将 (String, SparseVector) 的 RDD 转换为 (String) 的 DataFrame , 矢量).
我尝试使用通常的 sqlContext.createDataFrame
,但没有 DataType 符合我的需求。
df = sqlContext.createDataFrame(rdd,StructType([
StructField("label" , StringType(),True),
StructField("features" , ?Type(),True)
]))
你必须在这里使用 VectorUDT
:
# In Spark 1.x
# from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.ml.linalg import SparseVector, VectorUDT
temp_rdd = sc.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", VectorUDT(), True)
])
temp_rdd.toDF(schema).printSchema()
## root
## |-- label: double (nullable = true)
## |-- features: vector (nullable = true)
为了完整起见,Scala 等价物:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructType}
// In Spark 1x.
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
val schema = new StructType()
.add("label", DoubleType)
// In Spark 1.x
//.add("features", new VectorUDT())
.add("features",VectorType)
val temp_rdd: RDD[Row] = sc.parallelize(Seq(
Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
))
spark.createDataFrame(temp_rdd, schema).printSchema
// root
// |-- label: double (nullable = true)
// |-- features: vector (nullable = true)
虽然 @zero323 的回答 是有道理的,但我希望它对我有用——数据框下面的 rdd,sqlContext.createDataFrame(temp_rdd, schema),仍然包含 SparseVectors类型
我必须执行以下操作才能转换为 DenseVector 类型 - 如果有人有 shorter/better 我想知道的方式
temp_rdd = sc.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", VectorUDT(), True)
])
temp_rdd.toDF(schema).printSchema()
df_w_ftr = temp_rdd.toDF(schema)
print 'original convertion method: ',df_w_ftr.take(5)
print('\n')
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
print type(temp_rdd_dense), type(temp_rdd)
print 'using map and toArray:', temp_rdd_dense.take(5)
temp_rdd_dense.toDF().show()
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))]
<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'>
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)]
+------------------+-----+
| features|label|
+------------------+-----+
| [0.0,1.0,0.0,5.5]| 0.0|
|[-1.0,0.0,0.5,0.0]| 1.0|
+------------------+-----+
这是 spark 2.1 的 scala 示例
import org.apache.spark.ml.linalg.Vector
def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = {
import sparkSession.implicits._
val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x))
val df = rdd.toDF("label","features").select("features")
df
}
toDF()
编译器无法识别 rdd
的特性
我有一个 RDD 和一个值元组(String,SparseVector),我想创建一个 DataFrame 使用 RDD。获取 (label:string, features:vector) DataFrame 这是大多数 ml 算法库所需的架构。 我知道这是可以做到的,因为 HashingTF ml 库在给定 DataFrame 的特征列时输出一个向量。
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
StructField("label", DoubleType(), False),
StructField("tokens", ArrayType(StringType()), False)
]))
#assumming there is an RDD (double,array(strings))
hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")
ndf = hashingTF.transform(temp_df)
ndf.printSchema()
#outputs
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#| |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
所以我的问题是,我能否以某种方式将 (String, SparseVector) 的 RDD 转换为 (String) 的 DataFrame , 矢量).
我尝试使用通常的 sqlContext.createDataFrame
,但没有 DataType 符合我的需求。
df = sqlContext.createDataFrame(rdd,StructType([
StructField("label" , StringType(),True),
StructField("features" , ?Type(),True)
]))
你必须在这里使用 VectorUDT
:
# In Spark 1.x
# from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.ml.linalg import SparseVector, VectorUDT
temp_rdd = sc.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", VectorUDT(), True)
])
temp_rdd.toDF(schema).printSchema()
## root
## |-- label: double (nullable = true)
## |-- features: vector (nullable = true)
为了完整起见,Scala 等价物:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructType}
// In Spark 1x.
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
val schema = new StructType()
.add("label", DoubleType)
// In Spark 1.x
//.add("features", new VectorUDT())
.add("features",VectorType)
val temp_rdd: RDD[Row] = sc.parallelize(Seq(
Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
))
spark.createDataFrame(temp_rdd, schema).printSchema
// root
// |-- label: double (nullable = true)
// |-- features: vector (nullable = true)
虽然 @zero323 的回答
temp_rdd = sc.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", VectorUDT(), True)
])
temp_rdd.toDF(schema).printSchema()
df_w_ftr = temp_rdd.toDF(schema)
print 'original convertion method: ',df_w_ftr.take(5)
print('\n')
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
print type(temp_rdd_dense), type(temp_rdd)
print 'using map and toArray:', temp_rdd_dense.take(5)
temp_rdd_dense.toDF().show()
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))]
<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'>
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)]
+------------------+-----+
| features|label|
+------------------+-----+
| [0.0,1.0,0.0,5.5]| 0.0|
|[-1.0,0.0,0.5,0.0]| 1.0|
+------------------+-----+
这是 spark 2.1 的 scala 示例
import org.apache.spark.ml.linalg.Vector
def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = {
import sparkSession.implicits._
val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x))
val df = rdd.toDF("label","features").select("features")
df
}
toDF()
编译器无法识别 rdd