如何将 Row 类型转换为 Vector 以提供给 KMeans
How to convert type Row into Vector to feed to the KMeans
当我尝试将 df2 提供给 kmeans 时,出现以下错误
clusters = KMeans.train(df2, 10, maxIterations=30,
runs=10, initializationMode="random")
我得到的错误:
Cannot convert type <class 'pyspark.sql.types.Row'> into Vector
df2 是按如下方式创建的数据框:
df = sqlContext.read.json("data/ALS3.json")
df2 = df.select('latitude','longitude')
df2.show()
latitude| longitude|
60.1643075| 24.9460844|
60.4686748| 22.2774728|
如何将这两列转换为 Vector 并将其提供给 KMeans?
毫升
问题是您错过了 documentation's example,很明显方法 train
需要 DataFrame
和 Vector
作为特征。
要修改当前数据的结构,您可以使用 VectorAssembler。在你的情况下,它可能是这样的:
from pyspark.sql.functions import *
vectorAssembler = VectorAssembler(inputCols=["latitude", "longitude"],
outputCol="features")
# For your special case that has string instead of doubles you should cast them first.
expr = [col(c).cast("Double").alias(c)
for c in vectorAssembler.getInputCols()]
df2 = df2.select(*expr)
df = vectorAssembler.transform(df2)
此外,您还应该使用 class MinMaxScaler 规范化您的 features
以获得更好的结果。
MLLib
为了使用 MLLib
实现此目的,您需要先使用映射函数,将所有 string
值转换为 Double
,然后将它们合并到一个 DenseVector.
rdd = df2.map(lambda data: Vectors.dense([float(c) for c in data]))
在这一点之后,您可以使用 rdd
变量训练您的 MLlib's KMeans model。
我得到 PySpark 2.3.1 来执行 KMeans on a DataFrame 如下:
- 列出要包含在聚类分析中的列:
feat_cols = ['latitude','longitude']`
- 您需要所有列都是数值:
expr = [col(c).cast("Double").alias(c) for c in feat_cols]
df2 = df2.select(*expr)
- 使用
mllib.linalg.Vectors
创建您的 特征 向量:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df3 = assembler.transform(df2).select('features')
- 你应该规范化你的特征因为规范化并不总是需要的,但它很少有坏处(more about this here):
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures",
withStd=True,
withMean=False)
scalerModel = scaler.fit(df3)
df4 = scalerModel.transform(df3).drop('features')\
.withColumnRenamed('scaledFeatures', 'features')
from pyspark.mllib.linalg import Vectors
data5 = df4.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
- 将得到的RDD对象作为KMeans训练的输入:
from pyspark.mllib.clustering import KMeans
model = KMeans.train(data5, k=3, maxIterations=10)
- 示例:对向量 space:
中的点 p 进行分类
prediction = model.predict(p)
当我尝试将 df2 提供给 kmeans 时,出现以下错误
clusters = KMeans.train(df2, 10, maxIterations=30,
runs=10, initializationMode="random")
我得到的错误:
Cannot convert type <class 'pyspark.sql.types.Row'> into Vector
df2 是按如下方式创建的数据框:
df = sqlContext.read.json("data/ALS3.json")
df2 = df.select('latitude','longitude')
df2.show()
latitude| longitude|
60.1643075| 24.9460844|
60.4686748| 22.2774728|
如何将这两列转换为 Vector 并将其提供给 KMeans?
毫升
问题是您错过了 documentation's example,很明显方法 train
需要 DataFrame
和 Vector
作为特征。
要修改当前数据的结构,您可以使用 VectorAssembler。在你的情况下,它可能是这样的:
from pyspark.sql.functions import *
vectorAssembler = VectorAssembler(inputCols=["latitude", "longitude"],
outputCol="features")
# For your special case that has string instead of doubles you should cast them first.
expr = [col(c).cast("Double").alias(c)
for c in vectorAssembler.getInputCols()]
df2 = df2.select(*expr)
df = vectorAssembler.transform(df2)
此外,您还应该使用 class MinMaxScaler 规范化您的 features
以获得更好的结果。
MLLib
为了使用 MLLib
实现此目的,您需要先使用映射函数,将所有 string
值转换为 Double
,然后将它们合并到一个 DenseVector.
rdd = df2.map(lambda data: Vectors.dense([float(c) for c in data]))
在这一点之后,您可以使用 rdd
变量训练您的 MLlib's KMeans model。
我得到 PySpark 2.3.1 来执行 KMeans on a DataFrame 如下:
- 列出要包含在聚类分析中的列:
feat_cols = ['latitude','longitude']`
- 您需要所有列都是数值:
expr = [col(c).cast("Double").alias(c) for c in feat_cols]
df2 = df2.select(*expr)
- 使用
mllib.linalg.Vectors
创建您的 特征 向量:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df3 = assembler.transform(df2).select('features')
- 你应该规范化你的特征因为规范化并不总是需要的,但它很少有坏处(more about this here):
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures",
withStd=True,
withMean=False)
scalerModel = scaler.fit(df3)
df4 = scalerModel.transform(df3).drop('features')\
.withColumnRenamed('scaledFeatures', 'features')
from pyspark.mllib.linalg import Vectors
data5 = df4.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
- 将得到的RDD对象作为KMeans训练的输入:
from pyspark.mllib.clustering import KMeans
model = KMeans.train(data5, k=3, maxIterations=10)
- 示例:对向量 space: 中的点 p 进行分类
prediction = model.predict(p)