在 pyspark 中训练 ML 算法

Training ML algorithm in pyspark

我是 Pyspark 的新手,正在尝试在 Pyspark 中创建 ML 模型 我的目标是创建一个 TFidf 向量化器并将这些特征传递到我的 SVM 模型。 我试过这个

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local[2]").setAppName("Stream")
sc = SparkContext(conf=conf)
parallelized = sc.parallelize(Dataset.CleanText)
             #dataset is a pandas dataframe with CleanText as one of the column
from pyspark.mllib.feature import HashingTF, IDF
hashingTF = HashingTF()
tf = hashingTF.transform(parallelized)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
#tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

print ("vecs: ",tfidf.glom().collect())
         #This is printing all the TFidf vectors


import numpy as np
labels = np.array(Dataset['LabelNo'])

现在我应该如何将这些 Tfidf 和标签值传递给我的模型?

我关注了这个 http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html

并尝试将标记点创建为

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("SparkSessionZipsExample").getOrCreate()

dd = [(labels[i], Vectors.dense(tfidf[i])) for i in range(len(labels))]
df = spark.createDataFrame(sc.parallelize(dd),schema=["label", "features"])

print ("df: ",df.glom().collect())

但这给我的错误是:

---〉 15 dd = [(labels[i], Vectors.dense(tfidf[i])) for i in range(len(labels))] 16 df = spark.createDataFrame(sc.parallelize(dd),schema=["label", "features"]) 17

TypeError: 'RDD' object does not support indexing

错误清楚地解释了自己 RDD does not support indexing。您正在尝试通过使用 i 作为其索引(第 15 行中的 tfidf[i])来获取 tfidfith 行。 RDD 不像列表那样工作。 RDD 是分布式数据集。行随机分配给工人。

如果您希望代码正常工作,则必须将 tfidf 收集到单个节点,但这会破坏像 spark 这样的分布式框架的目的。

我建议您使用数据帧而不是 rdds,因为它们比 rdds 快得多,而且 ml 库支持 mllib 提供的大多数操作(HashingTF、IDF)。