使用 pyspark 使用单独的训练集和数据训练 NaiveBayes 模型

NaiveBayes model training with separate training set and data using pyspark

所以,我正在尝试训练朴素贝叶斯分类器。在预处理数据时遇到了很多麻烦,我现在已经生成了两个 RDD:

  1. 训练集:由一组稀疏向量组成;
  2. 标签:每个向量的相应标签列表(0,1)。

我需要运行这样的东西:

# Train a naive Bayes model.
model = NaiveBayes.train(training, 1.0)

但 "training" 是从 运行ning 派生的数据集:

def parseLine(line):
    parts = line.split(',')
    label = float(parts[0])
    features = Vectors.dense([float(x) for x in parts[1].split(' ')])
    return LabeledPoint(label, features)

data = sc.textFile('data/mllib/sample_naive_bayes_data.txt').map(parseLine)

基于 python here 的文档。我的问题是,鉴于我不想从 txt 文件加载数据,并且我已经以映射到稀疏向量 (RDD) 和相应标记列表的记录形式创建了训练集,我如何才能运行 朴素贝叶斯?

这是我的部分代码:

# Function
def featurize(tokens_kv, dictionary):
    """
    :param tokens_kv: list of tuples of the form (word, tf-idf score)
    :param dictionary: list of n words
    :return: sparse_vector of size n
    """

    # MUST sort tokens_kv by key
    tokens_kv = collections.OrderedDict(sorted(tokens_kv.items()))

    vector_size = len(dictionary)
    non_zero_indexes = []
    index_tfidf_values = []

    for key, value in tokens_kv.iteritems():
        index = 0
        for word in dictionary:
            if key == word:
                non_zero_indexes.append(index)
                index_tfidf_values.append(value)
            index += 1

    print non_zero_indexes
    print index_tfidf_values

    return SparseVector(vector_size, non_zero_indexes, index_tfidf_values)

# Feature Extraction
Training_Set_Vectors = (TFsIDFs_Vector_Weights_RDDs
                        .map(lambda (tokens): featurize(tokens, Dictionary_BV.value))
                        .cache())

... 而标签只是 1 和 0 的列表。我知道我可能需要以某种方式以某种方式使用 labeledpoint 但我对如何...感到困惑...... RDD 不是列表而标签是列表我希望像想出一种创建 labeledpoint objets [i] 的方法一样简单结合稀疏向量[i]、对应标签[i]各自的值...有什么想法吗?

我能够通过首先收集 SparseVectors RDD 来解决这个问题——有效地将它们转换为列表。然后,我 运行 构造了一个列表的函数 标记点对象:

def final_form_4_training(SVs, labels):
    """
    :param SVs: List of Sparse vectors.
    :param labels: List of labels
    :return: list of labeledpoint objects
    """

    to_train = []
    for i in range(len(labels)):
        to_train.append(LabeledPoint(labels[i], SVs[i]))
    return to_train

# Feature Extraction
Training_Set_Vectors = (TFsIDFs_Vector_Weights_RDDs
                        .map(lambda (tokens): featurize(tokens, Dictionary_BV.value))
                        .collect())

raw_input("Generate the LabeledPoint parameter... ")
labelled_training_set = sc.parallelize(final_form_4_training(Training_Set_Vectors, training_labels))

raw_input("Train the model... ")
model = NaiveBayes.train(labelled_training_set, 1.0)

但是,这假设 RDD 在整个过程管道中保持它们的顺序(我没有搞乱)。我也讨厌我必须收集 master 上的所有内容的部分。有更好的想法吗?