结合 VectorAssembler 和 HashingTF 转换器的 Spark 管道
Spark pipeline combining VectorAssembler and HashingTF transformers
让我们定义一个 Spark 管道,将几个列组合在一起,然后应用特征哈希:
val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0))).toDF("colx", "coly", "colz")
val va = new VectorAssembler().setInputCols(Array("colx", "coly", "colz")).setOutputCol("ft")
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val pipeline = new Pipeline().setStages(Array(va, hashIt))
用 pipeline.fit(df)
拟合管道抛出:
java.lang.IllegalArgumentException: requirement failed: The input
column must be ArrayType, but got
org.apache.spark.mllib.linalg.VectorUDT@f71b0bce
是否有转换器可以让 VectorAssembler
和 HashingTF
能够一起工作?
就我个人而言,我什至不会为此目的使用管道 API,一个 array
函数就足够了
val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
.toDF("colx", "coly", "colz")
.withColumn("ft", array('colx, 'coly, 'colz))
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val res = hashIt.transform(df)
res.show(false)
# +----+----+----+---------------+------------------------------+
# |colx|coly|colz|ft |ft2 |
# +----+----+----+---------------+------------------------------+
# |0.0 |1.0 |2.0 |[0.0, 1.0, 2.0]|(262144,[0,1,2],[1.0,1.0,1.0])|
# |3.0 |4.0 |5.0 |[3.0, 4.0, 5.0]|(262144,[3,4,5],[1.0,1.0,1.0])|
# +----+----+----+---------------+------------------------------+
作为问题的后续行动,为了在列数 > 3 的情况下推广数组函数的应用,以下步骤将所有列连接成一列,并包含所有需要的列:
val df2 = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
.toDF("colx", "coly", "colz")
val cols = (for (i <- df2.columns) yield df2(i)).toList
df2.withColumn("ft",array(cols :_*)).show
# +----+----+----+---------------+
# |colx|coly|colz| ft|
# +----+----+----+---------------+
# | 0.0| 1.0| 2.0|[0.0, 1.0, 2.0]|
# | 3.0| 4.0| 5.0|[3.0, 4.0, 5.0]|
# +----+----+----+---------------+
让我们定义一个 Spark 管道,将几个列组合在一起,然后应用特征哈希:
val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0))).toDF("colx", "coly", "colz")
val va = new VectorAssembler().setInputCols(Array("colx", "coly", "colz")).setOutputCol("ft")
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val pipeline = new Pipeline().setStages(Array(va, hashIt))
用 pipeline.fit(df)
拟合管道抛出:
java.lang.IllegalArgumentException: requirement failed: The input column must be ArrayType, but got org.apache.spark.mllib.linalg.VectorUDT@f71b0bce
是否有转换器可以让 VectorAssembler
和 HashingTF
能够一起工作?
就我个人而言,我什至不会为此目的使用管道 API,一个 array
函数就足够了
val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
.toDF("colx", "coly", "colz")
.withColumn("ft", array('colx, 'coly, 'colz))
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val res = hashIt.transform(df)
res.show(false)
# +----+----+----+---------------+------------------------------+
# |colx|coly|colz|ft |ft2 |
# +----+----+----+---------------+------------------------------+
# |0.0 |1.0 |2.0 |[0.0, 1.0, 2.0]|(262144,[0,1,2],[1.0,1.0,1.0])|
# |3.0 |4.0 |5.0 |[3.0, 4.0, 5.0]|(262144,[3,4,5],[1.0,1.0,1.0])|
# +----+----+----+---------------+------------------------------+
作为问题的后续行动,为了在列数 > 3 的情况下推广数组函数的应用,以下步骤将所有列连接成一列,并包含所有需要的列:
val df2 = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0)))
.toDF("colx", "coly", "colz")
val cols = (for (i <- df2.columns) yield df2(i)).toList
df2.withColumn("ft",array(cols :_*)).show
# +----+----+----+---------------+
# |colx|coly|colz| ft|
# +----+----+----+---------------+
# | 0.0| 1.0| 2.0|[0.0, 1.0, 2.0]|
# | 3.0| 4.0| 5.0|[3.0, 4.0, 5.0]|
# +----+----+----+---------------+